This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 97e3bb1962 Use BatchedArrayBlockingQueue in Journal (#3843)
97e3bb1962 is described below

commit 97e3bb1962ad253c683494a7f5c362a4f974511f
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Tue Mar 21 08:03:28 2023 -0700

    Use BatchedArrayBlockingQueue in Journal (#3843)
    
    * Use BatchedArrayBlockingQueue in Journal
    
    # Conflicts:
    #       
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
    
    * remove unnecessary imports.
    
    * Address the comments.
    
    * fix the ci problems.
    
    ---------
    
    Co-authored-by: horizonzy <horizo...@apache.org>
---
 .../java/org/apache/bookkeeper/bookie/Journal.java | 322 ++++++++++-----------
 .../bookkeeper/bookie/BookieJournalForceTest.java  |  21 +-
 .../bookie/BookieJournalPageCacheFlushTest.java    |  24 +-
 3 files changed, 174 insertions(+), 193 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 50eb605414..4fd2c2f28d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -38,16 +38,15 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import 
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.bookie.stats.JournalStats;
+import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue;
+import org.apache.bookkeeper.common.collections.BatchedBlockingQueue;
 import org.apache.bookkeeper.common.collections.BlockingMpscQueue;
 import org.apache.bookkeeper.common.collections.RecyclableArrayList;
 import org.apache.bookkeeper.common.util.MemoryLimitController;
@@ -489,36 +488,32 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                 }
             }
 
-            final List<ForceWriteRequest> localRequests = new ArrayList<>();
             final ObjectHashSet<BookieRequestHandler> writeHandlers = new 
ObjectHashSet<>();
+            final ForceWriteRequest[] localRequests = new 
ForceWriteRequest[conf.getJournalQueueSize()];
 
             while (running) {
                 try {
-                    int numReqInLastForceWrite = 0;
+                    int numEntriesInLastForceWrite = 0;
 
-                    int requestsCount = 
forceWriteRequests.drainTo(localRequests);
-                    if (requestsCount == 0) {
-                        ForceWriteRequest fwr = forceWriteRequests.take();
-                        localRequests.add(fwr);
-                        requestsCount = 1;
-                    }
+                    int requestsCount = 
forceWriteRequests.takeAll(localRequests);
 
                     
journalStats.getForceWriteQueueSize().addCount(-requestsCount);
 
                     // Sync and mark the journal up to the position of the 
last entry in the batch
-                    ForceWriteRequest lastRequest = 
localRequests.get(requestsCount - 1);
+                    ForceWriteRequest lastRequest = 
localRequests[requestsCount - 1];
                     syncJournal(lastRequest);
 
                     // All the requests in the batch are now fully-synced. We 
can trigger sending the
                     // responses
                     for (int i = 0; i < requestsCount; i++) {
-                        ForceWriteRequest req = localRequests.get(i);
-                        numReqInLastForceWrite += req.process(writeHandlers);
+                        ForceWriteRequest req = localRequests[i];
+                        numEntriesInLastForceWrite += 
req.process(writeHandlers);
+                        localRequests[i] = null;
                         req.recycle();
                     }
 
                     journalStats.getForceWriteGroupingCountStats()
-                            .registerSuccessfulValue(numReqInLastForceWrite);
+                            
.registerSuccessfulValue(numEntriesInLastForceWrite);
                     writeHandlers.forEach(
                             (ObjectProcedure<? super BookieRequestHandler>)
                                     
BookieRequestHandler::flushPendingResponse);
@@ -529,16 +524,8 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     LOG.info("ForceWrite thread interrupted");
-                    // close is idempotent
-                    if (!localRequests.isEmpty()) {
-                        ForceWriteRequest req = 
localRequests.get(localRequests.size() - 1);
-                        req.shouldClose = true;
-                        req.closeFileIfNecessary();
-                    }
                     running = false;
                 }
-
-                localRequests.clear();
             }
             // Regardless of what caused us to exit, we should notify the
             // the parent thread as it should either exit or be in the process
@@ -653,8 +640,8 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
     private static final String journalThreadName = "BookieJournal";
 
     // journal entry queue to commit
-    final BlockingQueue<QueueEntry> queue;
-    final BlockingQueue<ForceWriteRequest> forceWriteRequests;
+    final BatchedBlockingQueue<QueueEntry> queue;
+    final BatchedBlockingQueue<ForceWriteRequest> forceWriteRequests;
 
     volatile boolean running = true;
     private final LedgerDirsManager ledgerDirsManager;
@@ -684,8 +671,8 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
             queue = new BlockingMpscQueue<>(conf.getJournalQueueSize());
             forceWriteRequests = new 
BlockingMpscQueue<>(conf.getJournalQueueSize());
         } else {
-            queue = new ArrayBlockingQueue<>(conf.getJournalQueueSize());
-            forceWriteRequests = new 
ArrayBlockingQueue<>(conf.getJournalQueueSize());
+            queue = new 
BatchedArrayBlockingQueue<>(conf.getJournalQueueSize());
+            forceWriteRequests = new 
BatchedArrayBlockingQueue<>(conf.getJournalQueueSize());
         }
 
         // Adjust the journal max memory in case there are multiple journals 
configured.
@@ -990,15 +977,14 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
             long dequeueStartTime = 0L;
             long lastFlushTimeMs = System.currentTimeMillis();
 
-            long busyStartTime = System.nanoTime();
-            ArrayDeque<QueueEntry> localQueueEntries = new ArrayDeque<>();
             final ObjectHashSet<BookieRequestHandler> writeHandlers = new 
ObjectHashSet<>();
-
+            QueueEntry[] localQueueEntries = new 
QueueEntry[conf.getJournalQueueSize()];
+            int localQueueEntriesIdx = 0;
+            int localQueueEntriesLen = 0;
             QueueEntry qe = null;
             while (true) {
                 // new journal file to write
                 if (null == logFile) {
-
                     logId = logId + 1;
                     journalIds = listJournalIds(journalDirectory, null);
                     Long replaceLogId = fileChannelProvider.supportReuseFile() 
&& journalReuseFiles
@@ -1026,155 +1012,147 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                             
.registerSuccessfulEvent(MathUtils.elapsedNanos(dequeueStartTime), 
TimeUnit.NANOSECONDS);
                     }
 
-                    if (numEntriesToFlush == 0 && localQueueEntries.isEmpty()) 
{
-                        queue.drainTo(localQueueEntries);
-
-                        
journalTime.addLatency(MathUtils.elapsedNanos(busyStartTime), 
TimeUnit.NANOSECONDS);
-                        if (!localQueueEntries.isEmpty()) {
-                            qe = localQueueEntries.removeFirst();
-                        } else {
-                            qe = queue.take();
-                        }
-
-                        dequeueStartTime = MathUtils.nowInNano();
-                        busyStartTime = dequeueStartTime;
-                        journalStats.getJournalQueueSize().dec();
-                        journalStats.getJournalQueueStats()
-                                
.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime),
-                                        TimeUnit.NANOSECONDS);
+                    // At this point the local queue will always be empty, 
otherwise we would have
+                    // advanced to the next `qe` at the end of the loop
+                    localQueueEntriesIdx = 0;
+                    if (numEntriesToFlush == 0) {
+                        // There are no entries pending. We can wait 
indefinitely until the next
+                        // one is available
+                        localQueueEntriesLen = 
queue.takeAll(localQueueEntries);
                     } else {
-                        if (localQueueEntries.isEmpty()) {
-                            queue.drainTo(localQueueEntries);
+                        // There are already some entries pending. We must 
adjust
+                        // the waiting time to the remaining groupWait time
+                        long pollWaitTimeNanos = maxGroupWaitInNanos
+                                - 
MathUtils.elapsedNanos(toFlush.get(0).enqueueTime);
+                        if (flushWhenQueueEmpty || pollWaitTimeNanos < 0) {
+                            pollWaitTimeNanos = 0;
                         }
 
-                        if (!localQueueEntries.isEmpty()) {
-                            
journalTime.addLatency(MathUtils.elapsedNanos(busyStartTime), 
TimeUnit.NANOSECONDS);
-                            qe = localQueueEntries.removeFirst();
-                            dequeueStartTime = MathUtils.nowInNano();
-                            busyStartTime = dequeueStartTime;
-                        } else {
-                            long pollWaitTimeNanos = maxGroupWaitInNanos
-                                    - 
MathUtils.elapsedNanos(toFlush.get(0).enqueueTime);
-                            if (flushWhenQueueEmpty || pollWaitTimeNanos < 0) {
-                                pollWaitTimeNanos = 0;
-                            }
-                            qe = queue.poll(pollWaitTimeNanos, 
TimeUnit.NANOSECONDS);
-                            dequeueStartTime = MathUtils.nowInNano();
-                        }
+                        localQueueEntriesLen = queue.pollAll(localQueueEntries,
+                                pollWaitTimeNanos, TimeUnit.NANOSECONDS);
+                    }
+
+                    dequeueStartTime = MathUtils.nowInNano();
 
-                        if (qe != null) {
-                            journalStats.getJournalQueueSize().dec();
-                            journalStats.getJournalQueueStats()
+                    if (localQueueEntriesLen > 0) {
+                        qe = localQueueEntries[localQueueEntriesIdx];
+                        localQueueEntries[localQueueEntriesIdx++] = null;
+                        journalStats.getJournalQueueSize().dec();
+                        journalStats.getJournalQueueStats()
                                 
.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime), 
TimeUnit.NANOSECONDS);
-                        }
+                    }
+                } else {
+                    journalStats.getJournalQueueSize().dec();
+                    journalStats.getJournalQueueStats()
+                            
.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime), 
TimeUnit.NANOSECONDS);
+                }
+                if (numEntriesToFlush > 0) {
+                    boolean shouldFlush = false;
+                    // We should issue a forceWrite if any of the three 
conditions below holds good
+                    // 1. If the oldest pending entry has been pending for 
longer than the max wait time
+                    if (maxGroupWaitInNanos > 0 && !groupWhenTimeout && 
(MathUtils
+                            .elapsedNanos(toFlush.get(0).enqueueTime) > 
maxGroupWaitInNanos)) {
+                        groupWhenTimeout = true;
+                    } else if (maxGroupWaitInNanos > 0 && groupWhenTimeout
+                        && (qe == null // no entry to group
+                            || MathUtils.elapsedNanos(qe.enqueueTime) < 
maxGroupWaitInNanos)) {
+                        // when group timeout, it would be better to look 
forward, as there might be lots of
+                        // entries already timeout
+                        // due to a previous slow write (writing to filesystem 
which impacted by force write).
+                        // Group those entries in the queue
+                        // a) already timeout
+                        // b) limit the number of entries to group
+                        groupWhenTimeout = false;
+                        shouldFlush = true;
+                        journalStats.getFlushMaxWaitCounter().inc();
+                    } else if (qe != null
+                            && ((bufferedEntriesThreshold > 0 && 
toFlush.size() > bufferedEntriesThreshold)
+                            || (bc.position() > lastFlushPosition + 
bufferedWritesThreshold))) {
+                        // 2. If we have buffered more than the 
buffWriteThreshold or bufferedEntriesThreshold
+                        groupWhenTimeout = false;
+                        shouldFlush = true;
+                        
journalStats.getFlushMaxOutstandingBytesCounter().inc();
+                    } else if (qe == null && flushWhenQueueEmpty) {
+                        // We should get here only if we flushWhenQueueEmpty 
is true else we would wait
+                        // for timeout that would put is past the maxWait 
threshold
+                        // 3. If the queue is empty i.e. no benefit of 
grouping. This happens when we have one
+                        // publish at a time - common case in tests.
+                        groupWhenTimeout = false;
+                        shouldFlush = true;
+                        journalStats.getFlushEmptyQueueCounter().inc();
+                    }
 
-                        boolean shouldFlush = false;
-                        // We should issue a forceWrite if any of the three 
conditions below holds good
-                        // 1. If the oldest pending entry has been pending for 
longer than the max wait time
-                        if (maxGroupWaitInNanos > 0 && !groupWhenTimeout && 
(MathUtils
-                                .elapsedNanos(toFlush.get(0).enqueueTime) > 
maxGroupWaitInNanos)) {
-                            groupWhenTimeout = true;
-                        } else if (maxGroupWaitInNanos > 0 && groupWhenTimeout
-                            && (qe == null // no entry to group
-                                || MathUtils.elapsedNanos(qe.enqueueTime) < 
maxGroupWaitInNanos)) {
-                            // when group timeout, it would be better to look 
forward, as there might be lots of
-                            // entries already timeout
-                            // due to a previous slow write (writing to 
filesystem which impacted by force write).
-                            // Group those entries in the queue
-                            // a) already timeout
-                            // b) limit the number of entries to group
-                            groupWhenTimeout = false;
-                            shouldFlush = true;
-                            journalStats.getFlushMaxWaitCounter().inc();
-                        } else if (qe != null
-                                && ((bufferedEntriesThreshold > 0 && 
toFlush.size() > bufferedEntriesThreshold)
-                                || (bc.position() > lastFlushPosition + 
bufferedWritesThreshold))) {
-                            // 2. If we have buffered more than the 
buffWriteThreshold or bufferedEntriesThreshold
-                            groupWhenTimeout = false;
-                            shouldFlush = true;
-                            
journalStats.getFlushMaxOutstandingBytesCounter().inc();
-                        } else if (qe == null && flushWhenQueueEmpty) {
-                            // We should get here only if we 
flushWhenQueueEmpty is true else we would wait
-                            // for timeout that would put is past the maxWait 
threshold
-                            // 3. If the queue is empty i.e. no benefit of 
grouping. This happens when we have one
-                            // publish at a time - common case in tests.
-                            groupWhenTimeout = false;
-                            shouldFlush = true;
-                            journalStats.getFlushEmptyQueueCounter().inc();
+                    // toFlush is non null and not empty so should be safe to 
access getFirst
+                    if (shouldFlush) {
+                        if (journalFormatVersionToWrite >= JournalChannel.V5) {
+                            writePaddingBytes(logFile, paddingBuff, 
journalAlignmentSize);
                         }
-
-                        // toFlush is non null and not empty so should be safe 
to access getFirst
-                        if (shouldFlush) {
-                            if (journalFormatVersionToWrite >= 
JournalChannel.V5) {
-                                writePaddingBytes(logFile, paddingBuff, 
journalAlignmentSize);
-                            }
-                            journalFlushWatcher.reset().start();
-                            bc.flush();
-
-                            for (int i = 0; i < toFlush.size(); i++) {
-                                QueueEntry entry = toFlush.get(i);
-                                if (entry != null && (!syncData || 
entry.ackBeforeSync)) {
-                                    toFlush.set(i, null);
-                                    numEntriesToFlush--;
-                                    if (entry.getCtx() instanceof 
BookieRequestHandler
-                                            && entry.entryId != 
BookieImpl.METAENTRY_ID_FORCE_LEDGER) {
-                                        
writeHandlers.add((BookieRequestHandler) entry.getCtx());
-                                    }
-                                    entry.run();
+                        journalFlushWatcher.reset().start();
+                        bc.flush();
+
+                        for (int i = 0; i < toFlush.size(); i++) {
+                            QueueEntry entry = toFlush.get(i);
+                            if (entry != null && (!syncData || 
entry.ackBeforeSync)) {
+                                toFlush.set(i, null);
+                                numEntriesToFlush--;
+                                if (entry.getCtx() instanceof 
BookieRequestHandler
+                                        && entry.entryId != 
BookieImpl.METAENTRY_ID_FORCE_LEDGER) {
+                                    writeHandlers.add((BookieRequestHandler) 
entry.getCtx());
                                 }
+                                entry.run();
                             }
-                            writeHandlers.forEach(
-                                    (ObjectProcedure<? super 
BookieRequestHandler>)
-                                            
BookieRequestHandler::flushPendingResponse);
-                            writeHandlers.clear();
-                            lastFlushPosition = bc.position();
-                            
journalStats.getJournalFlushStats().registerSuccessfulEvent(
-                                    
journalFlushWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
-
-                            // Trace the lifetime of entries through 
persistence
-                            if (LOG.isDebugEnabled()) {
-                                for (QueueEntry e : toFlush) {
-                                    if (e != null && LOG.isDebugEnabled()) {
-                                        LOG.debug("Written and queuing for 
flush Ledger: {}  Entry: {}",
-                                                  e.ledgerId, e.entryId);
-                                    }
+                        }
+                        writeHandlers.forEach(
+                                (ObjectProcedure<? super BookieRequestHandler>)
+                                        
BookieRequestHandler::flushPendingResponse);
+                        writeHandlers.clear();
+
+                        lastFlushPosition = bc.position();
+                        
journalStats.getJournalFlushStats().registerSuccessfulEvent(
+                                
journalFlushWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+
+                        // Trace the lifetime of entries through persistence
+                        if (LOG.isDebugEnabled()) {
+                            for (QueueEntry e : toFlush) {
+                                if (e != null && LOG.isDebugEnabled()) {
+                                    LOG.debug("Written and queuing for flush 
Ledger: {}  Entry: {}",
+                                              e.ledgerId, e.entryId);
                                 }
                             }
+                        }
 
-                            journalStats.getForceWriteBatchEntriesStats()
-                                .registerSuccessfulValue(numEntriesToFlush);
-                            journalStats.getForceWriteBatchBytesStats()
-                                .registerSuccessfulValue(batchSize);
-
-                            boolean shouldRolloverJournal = (lastFlushPosition 
> maxJournalSize);
-                            // Trigger data sync to disk in the "Force-Write" 
thread.
-                            // Trigger data sync to disk has three situations:
-                            // 1. journalSyncData enabled, usually for SSD 
used as journal storage
-                            // 2. shouldRolloverJournal is true, that is the 
journal file reaches maxJournalSize
-                            // 3. if journalSyncData disabled and 
shouldRolloverJournal is false, we can use
-                            //   journalPageCacheFlushIntervalMSec to control 
sync frequency, preventing disk
-                            //   synchronize frequently, which will increase 
disk io util.
-                            //   when flush interval reaches 
journalPageCacheFlushIntervalMSec (default: 1s),
-                            //   it will trigger data sync to disk
-                            if (syncData
-                                    || shouldRolloverJournal
-                                    || (System.currentTimeMillis() - 
lastFlushTimeMs
-                                    >= journalPageCacheFlushIntervalMSec)) {
-                                
forceWriteRequests.put(createForceWriteRequest(logFile, logId, 
lastFlushPosition,
-                                        toFlush, shouldRolloverJournal));
-                                lastFlushTimeMs = System.currentTimeMillis();
-                            }
-                            toFlush = entryListRecycler.newInstance();
-                            numEntriesToFlush = 0;
-
-                            batchSize = 0L;
-                            // check whether journal file is over file limit
-                            if (shouldRolloverJournal) {
-                                // if the journal file is rolled over, the 
journal file will be closed after last
-                                // entry is force written to disk.
-                                logFile = null;
-                                continue;
-                            }
+                        journalStats.getForceWriteBatchEntriesStats()
+                            .registerSuccessfulValue(numEntriesToFlush);
+                        journalStats.getForceWriteBatchBytesStats()
+                            .registerSuccessfulValue(batchSize);
+                        boolean shouldRolloverJournal = (lastFlushPosition > 
maxJournalSize);
+                        // Trigger data sync to disk in the "Force-Write" 
thread.
+                        // Trigger data sync to disk has three situations:
+                        // 1. journalSyncData enabled, usually for SSD used as 
journal storage
+                        // 2. shouldRolloverJournal is true, that is the 
journal file reaches maxJournalSize
+                        // 3. if journalSyncData disabled and 
shouldRolloverJournal is false, we can use
+                        //   journalPageCacheFlushIntervalMSec to control sync 
frequency, preventing disk
+                        //   synchronize frequently, which will increase disk 
io util.
+                        //   when flush interval reaches 
journalPageCacheFlushIntervalMSec (default: 1s),
+                        //   it will trigger data sync to disk
+                        if (syncData
+                                || shouldRolloverJournal
+                                || (System.currentTimeMillis() - 
lastFlushTimeMs
+                                >= journalPageCacheFlushIntervalMSec)) {
+                            
forceWriteRequests.put(createForceWriteRequest(logFile, logId, 
lastFlushPosition,
+                                    toFlush, shouldRolloverJournal));
+                            lastFlushTimeMs = System.currentTimeMillis();
+                        }
+                        toFlush = entryListRecycler.newInstance();
+                        numEntriesToFlush = 0;
+
+                        batchSize = 0L;
+                        // check whether journal file is over file limit
+                        if (shouldRolloverJournal) {
+                            // if the journal file is rolled over, the journal 
file will be closed after last
+                            // entry is force written to disk.
+                            logFile = null;
+                            continue;
                         }
                     }
                 }
@@ -1218,7 +1196,13 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
 
                 toFlush.add(qe);
                 numEntriesToFlush++;
-                qe = null;
+
+                if (localQueueEntriesIdx < localQueueEntriesLen) {
+                    qe = localQueueEntries[localQueueEntriesIdx];
+                    localQueueEntries[localQueueEntriesIdx++] = null;
+                } else {
+                    qe = null;
+                }
             }
         } catch (IOException ioe) {
             LOG.error("I/O exception in Journal thread!", ioe);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
index 2ed3a48efb..99845bd2d6 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java
@@ -35,14 +35,13 @@ import static 
org.powermock.api.mockito.PowerMockito.whenNew;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.io.File;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.Journal.ForceWriteRequest;
 import org.apache.bookkeeper.bookie.Journal.LastLogMark;
 import org.apache.bookkeeper.bookie.stats.JournalStats;
+import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.net.BookieId;
@@ -92,7 +91,7 @@ public class BookieJournalForceTest {
 
         // machinery to suspend ForceWriteThread
         CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
-        LinkedBlockingQueue<ForceWriteRequest> supportQueue =
+        BatchedArrayBlockingQueue<ForceWriteRequest> supportQueue =
                 
enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal);
 
         journal.start();
@@ -303,20 +302,20 @@ public class BookieJournalForceTest {
     }
 
     @SuppressWarnings("unchecked")
-    private LinkedBlockingQueue<ForceWriteRequest> 
enableForceWriteThreadSuspension(
+    private BatchedArrayBlockingQueue<ForceWriteRequest> 
enableForceWriteThreadSuspension(
         CountDownLatch forceWriteThreadSuspendedLatch,
         Journal journal) throws InterruptedException {
-        LinkedBlockingQueue<ForceWriteRequest> supportQueue = new 
LinkedBlockingQueue<>();
-        BlockingQueue<ForceWriteRequest> forceWriteRequests = 
mock(BlockingQueue.class);
+        BatchedArrayBlockingQueue<ForceWriteRequest> supportQueue = new 
BatchedArrayBlockingQueue<>(10000);
+        BatchedArrayBlockingQueue<ForceWriteRequest> forceWriteRequests = 
mock(BatchedArrayBlockingQueue.class);
         doAnswer((Answer) (InvocationOnMock iom) -> {
             supportQueue.put(iom.getArgument(0));
             return null;
         }).when(forceWriteRequests).put(any(ForceWriteRequest.class));
-        when(forceWriteRequests.take()).thenAnswer(i -> {
-            // suspend the force write thread
+        doAnswer((Answer) (InvocationOnMock iom) -> {
             forceWriteThreadSuspendedLatch.await();
-            return supportQueue.take();
-        });
+            ForceWriteRequest[] array = iom.getArgument(0);
+            return supportQueue.takeAll(array);
+        }).when(forceWriteRequests).takeAll(any());
         Whitebox.setInternalState(journal, "forceWriteRequests", 
forceWriteRequests);
         return supportQueue;
     }
@@ -338,7 +337,7 @@ public class BookieJournalForceTest {
 
         // machinery to suspend ForceWriteThread
         CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
-        LinkedBlockingQueue<ForceWriteRequest> supportQueue =
+        BatchedArrayBlockingQueue<ForceWriteRequest> supportQueue =
                 
enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal);
         journal.start();
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalPageCacheFlushTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalPageCacheFlushTest.java
index 02b151feed..7b169561f7 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalPageCacheFlushTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalPageCacheFlushTest.java
@@ -28,19 +28,17 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.whenNew;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.io.File;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.Journal.ForceWriteRequest;
 import org.apache.bookkeeper.bookie.Journal.LastLogMark;
+import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.net.BookieId;
@@ -71,20 +69,20 @@ public class BookieJournalPageCacheFlushTest {
     public TemporaryFolder tempDir = new TemporaryFolder();
 
     @SuppressWarnings("unchecked")
-    private LinkedBlockingQueue<ForceWriteRequest> 
enableForceWriteThreadSuspension(
+    private BatchedArrayBlockingQueue<ForceWriteRequest> 
enableForceWriteThreadSuspension(
             CountDownLatch forceWriteThreadSuspendedLatch,
             Journal journal) throws InterruptedException {
-        LinkedBlockingQueue<ForceWriteRequest> supportQueue = new 
LinkedBlockingQueue<>();
-        BlockingQueue<ForceWriteRequest> forceWriteRequests = 
mock(BlockingQueue.class);
+        BatchedArrayBlockingQueue<ForceWriteRequest> supportQueue = new 
BatchedArrayBlockingQueue<>(10000);
+        BatchedArrayBlockingQueue<ForceWriteRequest> forceWriteRequests = 
mock(BatchedArrayBlockingQueue.class);
         doAnswer((Answer) (InvocationOnMock iom) -> {
             supportQueue.put(iom.getArgument(0));
             return null;
         }).when(forceWriteRequests).put(any(ForceWriteRequest.class));
-        when(forceWriteRequests.take()).thenAnswer(i -> {
-            // suspend the force write thread
+        doAnswer((Answer) (InvocationOnMock iom) -> {
             forceWriteThreadSuspendedLatch.await();
-            return supportQueue.take();
-        });
+            ForceWriteRequest[] array = iom.getArgument(0);
+            return supportQueue.takeAll(array);
+        }).when(forceWriteRequests).takeAll(any());
         Whitebox.setInternalState(journal, "forceWriteRequests", 
forceWriteRequests);
         return supportQueue;
     }
@@ -108,7 +106,7 @@ public class BookieJournalPageCacheFlushTest {
         Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
 
         CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
-        LinkedBlockingQueue<ForceWriteRequest> supportQueue =
+        BatchedArrayBlockingQueue<ForceWriteRequest> supportQueue =
                 
enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal);
         journal.start();
 
@@ -175,7 +173,7 @@ public class BookieJournalPageCacheFlushTest {
         Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
 
         CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
-        LinkedBlockingQueue<ForceWriteRequest> supportQueue =
+        BatchedArrayBlockingQueue<ForceWriteRequest> supportQueue =
                 
enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal);
         journal.start();
 
@@ -238,7 +236,7 @@ public class BookieJournalPageCacheFlushTest {
         Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager);
 
         CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
-        LinkedBlockingQueue<ForceWriteRequest> supportQueue =
+        BatchedArrayBlockingQueue<ForceWriteRequest> supportQueue =
                 
enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal);
         journal.start();
 


Reply via email to