This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push: new 97e3bb1962 Use BatchedArrayBlockingQueue in Journal (#3843) 97e3bb1962 is described below commit 97e3bb1962ad253c683494a7f5c362a4f974511f Author: Matteo Merli <mme...@apache.org> AuthorDate: Tue Mar 21 08:03:28 2023 -0700 Use BatchedArrayBlockingQueue in Journal (#3843) * Use BatchedArrayBlockingQueue in Journal # Conflicts: # bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java * remove unnecessary imports. * Address the comments. * fix the ci problems. --------- Co-authored-by: horizonzy <horizo...@apache.org> --- .../java/org/apache/bookkeeper/bookie/Journal.java | 322 ++++++++++----------- .../bookkeeper/bookie/BookieJournalForceTest.java | 21 +- .../bookie/BookieJournalPageCacheFlushTest.java | 24 +- 3 files changed, 174 insertions(+), 193 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 50eb605414..4fd2c2f28d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -38,16 +38,15 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.bookie.stats.JournalStats; +import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue; +import org.apache.bookkeeper.common.collections.BatchedBlockingQueue; import org.apache.bookkeeper.common.collections.BlockingMpscQueue; import org.apache.bookkeeper.common.collections.RecyclableArrayList; import org.apache.bookkeeper.common.util.MemoryLimitController; @@ -489,36 +488,32 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { } } - final List<ForceWriteRequest> localRequests = new ArrayList<>(); final ObjectHashSet<BookieRequestHandler> writeHandlers = new ObjectHashSet<>(); + final ForceWriteRequest[] localRequests = new ForceWriteRequest[conf.getJournalQueueSize()]; while (running) { try { - int numReqInLastForceWrite = 0; + int numEntriesInLastForceWrite = 0; - int requestsCount = forceWriteRequests.drainTo(localRequests); - if (requestsCount == 0) { - ForceWriteRequest fwr = forceWriteRequests.take(); - localRequests.add(fwr); - requestsCount = 1; - } + int requestsCount = forceWriteRequests.takeAll(localRequests); journalStats.getForceWriteQueueSize().addCount(-requestsCount); // Sync and mark the journal up to the position of the last entry in the batch - ForceWriteRequest lastRequest = localRequests.get(requestsCount - 1); + ForceWriteRequest lastRequest = localRequests[requestsCount - 1]; syncJournal(lastRequest); // All the requests in the batch are now fully-synced. We can trigger sending the // responses for (int i = 0; i < requestsCount; i++) { - ForceWriteRequest req = localRequests.get(i); - numReqInLastForceWrite += req.process(writeHandlers); + ForceWriteRequest req = localRequests[i]; + numEntriesInLastForceWrite += req.process(writeHandlers); + localRequests[i] = null; req.recycle(); } journalStats.getForceWriteGroupingCountStats() - .registerSuccessfulValue(numReqInLastForceWrite); + .registerSuccessfulValue(numEntriesInLastForceWrite); writeHandlers.forEach( (ObjectProcedure<? super BookieRequestHandler>) BookieRequestHandler::flushPendingResponse); @@ -529,16 +524,8 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.info("ForceWrite thread interrupted"); - // close is idempotent - if (!localRequests.isEmpty()) { - ForceWriteRequest req = localRequests.get(localRequests.size() - 1); - req.shouldClose = true; - req.closeFileIfNecessary(); - } running = false; } - - localRequests.clear(); } // Regardless of what caused us to exit, we should notify the // the parent thread as it should either exit or be in the process @@ -653,8 +640,8 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { private static final String journalThreadName = "BookieJournal"; // journal entry queue to commit - final BlockingQueue<QueueEntry> queue; - final BlockingQueue<ForceWriteRequest> forceWriteRequests; + final BatchedBlockingQueue<QueueEntry> queue; + final BatchedBlockingQueue<ForceWriteRequest> forceWriteRequests; volatile boolean running = true; private final LedgerDirsManager ledgerDirsManager; @@ -684,8 +671,8 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { queue = new BlockingMpscQueue<>(conf.getJournalQueueSize()); forceWriteRequests = new BlockingMpscQueue<>(conf.getJournalQueueSize()); } else { - queue = new ArrayBlockingQueue<>(conf.getJournalQueueSize()); - forceWriteRequests = new ArrayBlockingQueue<>(conf.getJournalQueueSize()); + queue = new BatchedArrayBlockingQueue<>(conf.getJournalQueueSize()); + forceWriteRequests = new BatchedArrayBlockingQueue<>(conf.getJournalQueueSize()); } // Adjust the journal max memory in case there are multiple journals configured. @@ -990,15 +977,14 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { long dequeueStartTime = 0L; long lastFlushTimeMs = System.currentTimeMillis(); - long busyStartTime = System.nanoTime(); - ArrayDeque<QueueEntry> localQueueEntries = new ArrayDeque<>(); final ObjectHashSet<BookieRequestHandler> writeHandlers = new ObjectHashSet<>(); - + QueueEntry[] localQueueEntries = new QueueEntry[conf.getJournalQueueSize()]; + int localQueueEntriesIdx = 0; + int localQueueEntriesLen = 0; QueueEntry qe = null; while (true) { // new journal file to write if (null == logFile) { - logId = logId + 1; journalIds = listJournalIds(journalDirectory, null); Long replaceLogId = fileChannelProvider.supportReuseFile() && journalReuseFiles @@ -1026,155 +1012,147 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { .registerSuccessfulEvent(MathUtils.elapsedNanos(dequeueStartTime), TimeUnit.NANOSECONDS); } - if (numEntriesToFlush == 0 && localQueueEntries.isEmpty()) { - queue.drainTo(localQueueEntries); - - journalTime.addLatency(MathUtils.elapsedNanos(busyStartTime), TimeUnit.NANOSECONDS); - if (!localQueueEntries.isEmpty()) { - qe = localQueueEntries.removeFirst(); - } else { - qe = queue.take(); - } - - dequeueStartTime = MathUtils.nowInNano(); - busyStartTime = dequeueStartTime; - journalStats.getJournalQueueSize().dec(); - journalStats.getJournalQueueStats() - .registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime), - TimeUnit.NANOSECONDS); + // At this point the local queue will always be empty, otherwise we would have + // advanced to the next `qe` at the end of the loop + localQueueEntriesIdx = 0; + if (numEntriesToFlush == 0) { + // There are no entries pending. We can wait indefinitely until the next + // one is available + localQueueEntriesLen = queue.takeAll(localQueueEntries); } else { - if (localQueueEntries.isEmpty()) { - queue.drainTo(localQueueEntries); + // There are already some entries pending. We must adjust + // the waiting time to the remaining groupWait time + long pollWaitTimeNanos = maxGroupWaitInNanos + - MathUtils.elapsedNanos(toFlush.get(0).enqueueTime); + if (flushWhenQueueEmpty || pollWaitTimeNanos < 0) { + pollWaitTimeNanos = 0; } - if (!localQueueEntries.isEmpty()) { - journalTime.addLatency(MathUtils.elapsedNanos(busyStartTime), TimeUnit.NANOSECONDS); - qe = localQueueEntries.removeFirst(); - dequeueStartTime = MathUtils.nowInNano(); - busyStartTime = dequeueStartTime; - } else { - long pollWaitTimeNanos = maxGroupWaitInNanos - - MathUtils.elapsedNanos(toFlush.get(0).enqueueTime); - if (flushWhenQueueEmpty || pollWaitTimeNanos < 0) { - pollWaitTimeNanos = 0; - } - qe = queue.poll(pollWaitTimeNanos, TimeUnit.NANOSECONDS); - dequeueStartTime = MathUtils.nowInNano(); - } + localQueueEntriesLen = queue.pollAll(localQueueEntries, + pollWaitTimeNanos, TimeUnit.NANOSECONDS); + } + + dequeueStartTime = MathUtils.nowInNano(); - if (qe != null) { - journalStats.getJournalQueueSize().dec(); - journalStats.getJournalQueueStats() + if (localQueueEntriesLen > 0) { + qe = localQueueEntries[localQueueEntriesIdx]; + localQueueEntries[localQueueEntriesIdx++] = null; + journalStats.getJournalQueueSize().dec(); + journalStats.getJournalQueueStats() .registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime), TimeUnit.NANOSECONDS); - } + } + } else { + journalStats.getJournalQueueSize().dec(); + journalStats.getJournalQueueStats() + .registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime), TimeUnit.NANOSECONDS); + } + if (numEntriesToFlush > 0) { + boolean shouldFlush = false; + // We should issue a forceWrite if any of the three conditions below holds good + // 1. If the oldest pending entry has been pending for longer than the max wait time + if (maxGroupWaitInNanos > 0 && !groupWhenTimeout && (MathUtils + .elapsedNanos(toFlush.get(0).enqueueTime) > maxGroupWaitInNanos)) { + groupWhenTimeout = true; + } else if (maxGroupWaitInNanos > 0 && groupWhenTimeout + && (qe == null // no entry to group + || MathUtils.elapsedNanos(qe.enqueueTime) < maxGroupWaitInNanos)) { + // when group timeout, it would be better to look forward, as there might be lots of + // entries already timeout + // due to a previous slow write (writing to filesystem which impacted by force write). + // Group those entries in the queue + // a) already timeout + // b) limit the number of entries to group + groupWhenTimeout = false; + shouldFlush = true; + journalStats.getFlushMaxWaitCounter().inc(); + } else if (qe != null + && ((bufferedEntriesThreshold > 0 && toFlush.size() > bufferedEntriesThreshold) + || (bc.position() > lastFlushPosition + bufferedWritesThreshold))) { + // 2. If we have buffered more than the buffWriteThreshold or bufferedEntriesThreshold + groupWhenTimeout = false; + shouldFlush = true; + journalStats.getFlushMaxOutstandingBytesCounter().inc(); + } else if (qe == null && flushWhenQueueEmpty) { + // We should get here only if we flushWhenQueueEmpty is true else we would wait + // for timeout that would put is past the maxWait threshold + // 3. If the queue is empty i.e. no benefit of grouping. This happens when we have one + // publish at a time - common case in tests. + groupWhenTimeout = false; + shouldFlush = true; + journalStats.getFlushEmptyQueueCounter().inc(); + } - boolean shouldFlush = false; - // We should issue a forceWrite if any of the three conditions below holds good - // 1. If the oldest pending entry has been pending for longer than the max wait time - if (maxGroupWaitInNanos > 0 && !groupWhenTimeout && (MathUtils - .elapsedNanos(toFlush.get(0).enqueueTime) > maxGroupWaitInNanos)) { - groupWhenTimeout = true; - } else if (maxGroupWaitInNanos > 0 && groupWhenTimeout - && (qe == null // no entry to group - || MathUtils.elapsedNanos(qe.enqueueTime) < maxGroupWaitInNanos)) { - // when group timeout, it would be better to look forward, as there might be lots of - // entries already timeout - // due to a previous slow write (writing to filesystem which impacted by force write). - // Group those entries in the queue - // a) already timeout - // b) limit the number of entries to group - groupWhenTimeout = false; - shouldFlush = true; - journalStats.getFlushMaxWaitCounter().inc(); - } else if (qe != null - && ((bufferedEntriesThreshold > 0 && toFlush.size() > bufferedEntriesThreshold) - || (bc.position() > lastFlushPosition + bufferedWritesThreshold))) { - // 2. If we have buffered more than the buffWriteThreshold or bufferedEntriesThreshold - groupWhenTimeout = false; - shouldFlush = true; - journalStats.getFlushMaxOutstandingBytesCounter().inc(); - } else if (qe == null && flushWhenQueueEmpty) { - // We should get here only if we flushWhenQueueEmpty is true else we would wait - // for timeout that would put is past the maxWait threshold - // 3. If the queue is empty i.e. no benefit of grouping. This happens when we have one - // publish at a time - common case in tests. - groupWhenTimeout = false; - shouldFlush = true; - journalStats.getFlushEmptyQueueCounter().inc(); + // toFlush is non null and not empty so should be safe to access getFirst + if (shouldFlush) { + if (journalFormatVersionToWrite >= JournalChannel.V5) { + writePaddingBytes(logFile, paddingBuff, journalAlignmentSize); } - - // toFlush is non null and not empty so should be safe to access getFirst - if (shouldFlush) { - if (journalFormatVersionToWrite >= JournalChannel.V5) { - writePaddingBytes(logFile, paddingBuff, journalAlignmentSize); - } - journalFlushWatcher.reset().start(); - bc.flush(); - - for (int i = 0; i < toFlush.size(); i++) { - QueueEntry entry = toFlush.get(i); - if (entry != null && (!syncData || entry.ackBeforeSync)) { - toFlush.set(i, null); - numEntriesToFlush--; - if (entry.getCtx() instanceof BookieRequestHandler - && entry.entryId != BookieImpl.METAENTRY_ID_FORCE_LEDGER) { - writeHandlers.add((BookieRequestHandler) entry.getCtx()); - } - entry.run(); + journalFlushWatcher.reset().start(); + bc.flush(); + + for (int i = 0; i < toFlush.size(); i++) { + QueueEntry entry = toFlush.get(i); + if (entry != null && (!syncData || entry.ackBeforeSync)) { + toFlush.set(i, null); + numEntriesToFlush--; + if (entry.getCtx() instanceof BookieRequestHandler + && entry.entryId != BookieImpl.METAENTRY_ID_FORCE_LEDGER) { + writeHandlers.add((BookieRequestHandler) entry.getCtx()); } + entry.run(); } - writeHandlers.forEach( - (ObjectProcedure<? super BookieRequestHandler>) - BookieRequestHandler::flushPendingResponse); - writeHandlers.clear(); - lastFlushPosition = bc.position(); - journalStats.getJournalFlushStats().registerSuccessfulEvent( - journalFlushWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); - - // Trace the lifetime of entries through persistence - if (LOG.isDebugEnabled()) { - for (QueueEntry e : toFlush) { - if (e != null && LOG.isDebugEnabled()) { - LOG.debug("Written and queuing for flush Ledger: {} Entry: {}", - e.ledgerId, e.entryId); - } + } + writeHandlers.forEach( + (ObjectProcedure<? super BookieRequestHandler>) + BookieRequestHandler::flushPendingResponse); + writeHandlers.clear(); + + lastFlushPosition = bc.position(); + journalStats.getJournalFlushStats().registerSuccessfulEvent( + journalFlushWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + + // Trace the lifetime of entries through persistence + if (LOG.isDebugEnabled()) { + for (QueueEntry e : toFlush) { + if (e != null && LOG.isDebugEnabled()) { + LOG.debug("Written and queuing for flush Ledger: {} Entry: {}", + e.ledgerId, e.entryId); } } + } - journalStats.getForceWriteBatchEntriesStats() - .registerSuccessfulValue(numEntriesToFlush); - journalStats.getForceWriteBatchBytesStats() - .registerSuccessfulValue(batchSize); - - boolean shouldRolloverJournal = (lastFlushPosition > maxJournalSize); - // Trigger data sync to disk in the "Force-Write" thread. - // Trigger data sync to disk has three situations: - // 1. journalSyncData enabled, usually for SSD used as journal storage - // 2. shouldRolloverJournal is true, that is the journal file reaches maxJournalSize - // 3. if journalSyncData disabled and shouldRolloverJournal is false, we can use - // journalPageCacheFlushIntervalMSec to control sync frequency, preventing disk - // synchronize frequently, which will increase disk io util. - // when flush interval reaches journalPageCacheFlushIntervalMSec (default: 1s), - // it will trigger data sync to disk - if (syncData - || shouldRolloverJournal - || (System.currentTimeMillis() - lastFlushTimeMs - >= journalPageCacheFlushIntervalMSec)) { - forceWriteRequests.put(createForceWriteRequest(logFile, logId, lastFlushPosition, - toFlush, shouldRolloverJournal)); - lastFlushTimeMs = System.currentTimeMillis(); - } - toFlush = entryListRecycler.newInstance(); - numEntriesToFlush = 0; - - batchSize = 0L; - // check whether journal file is over file limit - if (shouldRolloverJournal) { - // if the journal file is rolled over, the journal file will be closed after last - // entry is force written to disk. - logFile = null; - continue; - } + journalStats.getForceWriteBatchEntriesStats() + .registerSuccessfulValue(numEntriesToFlush); + journalStats.getForceWriteBatchBytesStats() + .registerSuccessfulValue(batchSize); + boolean shouldRolloverJournal = (lastFlushPosition > maxJournalSize); + // Trigger data sync to disk in the "Force-Write" thread. + // Trigger data sync to disk has three situations: + // 1. journalSyncData enabled, usually for SSD used as journal storage + // 2. shouldRolloverJournal is true, that is the journal file reaches maxJournalSize + // 3. if journalSyncData disabled and shouldRolloverJournal is false, we can use + // journalPageCacheFlushIntervalMSec to control sync frequency, preventing disk + // synchronize frequently, which will increase disk io util. + // when flush interval reaches journalPageCacheFlushIntervalMSec (default: 1s), + // it will trigger data sync to disk + if (syncData + || shouldRolloverJournal + || (System.currentTimeMillis() - lastFlushTimeMs + >= journalPageCacheFlushIntervalMSec)) { + forceWriteRequests.put(createForceWriteRequest(logFile, logId, lastFlushPosition, + toFlush, shouldRolloverJournal)); + lastFlushTimeMs = System.currentTimeMillis(); + } + toFlush = entryListRecycler.newInstance(); + numEntriesToFlush = 0; + + batchSize = 0L; + // check whether journal file is over file limit + if (shouldRolloverJournal) { + // if the journal file is rolled over, the journal file will be closed after last + // entry is force written to disk. + logFile = null; + continue; } } } @@ -1218,7 +1196,13 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { toFlush.add(qe); numEntriesToFlush++; - qe = null; + + if (localQueueEntriesIdx < localQueueEntriesLen) { + qe = localQueueEntries[localQueueEntriesIdx]; + localQueueEntries[localQueueEntriesIdx++] = null; + } else { + qe = null; + } } } catch (IOException ioe) { LOG.error("I/O exception in Journal thread!", ioe); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java index 2ed3a48efb..99845bd2d6 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalForceTest.java @@ -35,14 +35,13 @@ import static org.powermock.api.mockito.PowerMockito.whenNew; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.io.File; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.bookie.Journal.ForceWriteRequest; import org.apache.bookkeeper.bookie.Journal.LastLogMark; import org.apache.bookkeeper.bookie.stats.JournalStats; +import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; import org.apache.bookkeeper.net.BookieId; @@ -92,7 +91,7 @@ public class BookieJournalForceTest { // machinery to suspend ForceWriteThread CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1); - LinkedBlockingQueue<ForceWriteRequest> supportQueue = + BatchedArrayBlockingQueue<ForceWriteRequest> supportQueue = enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal); journal.start(); @@ -303,20 +302,20 @@ public class BookieJournalForceTest { } @SuppressWarnings("unchecked") - private LinkedBlockingQueue<ForceWriteRequest> enableForceWriteThreadSuspension( + private BatchedArrayBlockingQueue<ForceWriteRequest> enableForceWriteThreadSuspension( CountDownLatch forceWriteThreadSuspendedLatch, Journal journal) throws InterruptedException { - LinkedBlockingQueue<ForceWriteRequest> supportQueue = new LinkedBlockingQueue<>(); - BlockingQueue<ForceWriteRequest> forceWriteRequests = mock(BlockingQueue.class); + BatchedArrayBlockingQueue<ForceWriteRequest> supportQueue = new BatchedArrayBlockingQueue<>(10000); + BatchedArrayBlockingQueue<ForceWriteRequest> forceWriteRequests = mock(BatchedArrayBlockingQueue.class); doAnswer((Answer) (InvocationOnMock iom) -> { supportQueue.put(iom.getArgument(0)); return null; }).when(forceWriteRequests).put(any(ForceWriteRequest.class)); - when(forceWriteRequests.take()).thenAnswer(i -> { - // suspend the force write thread + doAnswer((Answer) (InvocationOnMock iom) -> { forceWriteThreadSuspendedLatch.await(); - return supportQueue.take(); - }); + ForceWriteRequest[] array = iom.getArgument(0); + return supportQueue.takeAll(array); + }).when(forceWriteRequests).takeAll(any()); Whitebox.setInternalState(journal, "forceWriteRequests", forceWriteRequests); return supportQueue; } @@ -338,7 +337,7 @@ public class BookieJournalForceTest { // machinery to suspend ForceWriteThread CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1); - LinkedBlockingQueue<ForceWriteRequest> supportQueue = + BatchedArrayBlockingQueue<ForceWriteRequest> supportQueue = enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal); journal.start(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalPageCacheFlushTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalPageCacheFlushTest.java index 02b151feed..7b169561f7 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalPageCacheFlushTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalPageCacheFlushTest.java @@ -28,19 +28,17 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import static org.powermock.api.mockito.PowerMockito.whenNew; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.io.File; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.bookie.Journal.ForceWriteRequest; import org.apache.bookkeeper.bookie.Journal.LastLogMark; +import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; import org.apache.bookkeeper.net.BookieId; @@ -71,20 +69,20 @@ public class BookieJournalPageCacheFlushTest { public TemporaryFolder tempDir = new TemporaryFolder(); @SuppressWarnings("unchecked") - private LinkedBlockingQueue<ForceWriteRequest> enableForceWriteThreadSuspension( + private BatchedArrayBlockingQueue<ForceWriteRequest> enableForceWriteThreadSuspension( CountDownLatch forceWriteThreadSuspendedLatch, Journal journal) throws InterruptedException { - LinkedBlockingQueue<ForceWriteRequest> supportQueue = new LinkedBlockingQueue<>(); - BlockingQueue<ForceWriteRequest> forceWriteRequests = mock(BlockingQueue.class); + BatchedArrayBlockingQueue<ForceWriteRequest> supportQueue = new BatchedArrayBlockingQueue<>(10000); + BatchedArrayBlockingQueue<ForceWriteRequest> forceWriteRequests = mock(BatchedArrayBlockingQueue.class); doAnswer((Answer) (InvocationOnMock iom) -> { supportQueue.put(iom.getArgument(0)); return null; }).when(forceWriteRequests).put(any(ForceWriteRequest.class)); - when(forceWriteRequests.take()).thenAnswer(i -> { - // suspend the force write thread + doAnswer((Answer) (InvocationOnMock iom) -> { forceWriteThreadSuspendedLatch.await(); - return supportQueue.take(); - }); + ForceWriteRequest[] array = iom.getArgument(0); + return supportQueue.takeAll(array); + }).when(forceWriteRequests).takeAll(any()); Whitebox.setInternalState(journal, "forceWriteRequests", forceWriteRequests); return supportQueue; } @@ -108,7 +106,7 @@ public class BookieJournalPageCacheFlushTest { Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager); CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1); - LinkedBlockingQueue<ForceWriteRequest> supportQueue = + BatchedArrayBlockingQueue<ForceWriteRequest> supportQueue = enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal); journal.start(); @@ -175,7 +173,7 @@ public class BookieJournalPageCacheFlushTest { Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager); CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1); - LinkedBlockingQueue<ForceWriteRequest> supportQueue = + BatchedArrayBlockingQueue<ForceWriteRequest> supportQueue = enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal); journal.start(); @@ -238,7 +236,7 @@ public class BookieJournalPageCacheFlushTest { Journal journal = new Journal(0, journalDir, conf, ledgerDirsManager); CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1); - LinkedBlockingQueue<ForceWriteRequest> supportQueue = + BatchedArrayBlockingQueue<ForceWriteRequest> supportQueue = enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal); journal.start();