This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 16a4365d7d Use BlockingQueue.drainTo() in Journal thread (#3544)
16a4365d7d is described below

commit 16a4365d7dcaafb2b0fc5ae6d43b4bee0af78b98
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Oct 20 08:12:03 2022 -0700

    Use BlockingQueue.drainTo() in Journal thread (#3544)
---
 .../java/org/apache/bookkeeper/bookie/Journal.java | 44 +++++++++++++++++-----
 1 file changed, 35 insertions(+), 9 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index a3e0086b69..d28c1cac09 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -36,6 +36,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -1071,6 +1072,8 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
             long lastFlushTimeMs = System.currentTimeMillis();
 
             long busyStartTime = System.nanoTime();
+            ArrayDeque<QueueEntry> localQueueEntries = new ArrayDeque<>();
+
             QueueEntry qe = null;
             while (true) {
                 // new journal file to write
@@ -1103,22 +1106,45 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                             
.registerSuccessfulEvent(MathUtils.elapsedNanos(dequeueStartTime), 
TimeUnit.NANOSECONDS);
                     }
 
-                    if (numEntriesToFlush == 0) {
+                    if (numEntriesToFlush == 0 && localQueueEntries.isEmpty()) 
{
+                        queue.drainTo(localQueueEntries);
+
                         
journalTime.addLatency(MathUtils.elapsedNanos(busyStartTime), 
TimeUnit.NANOSECONDS);
-                        qe = queue.take();
+                        if (!localQueueEntries.isEmpty()) {
+                            qe = localQueueEntries.removeFirst();
+                        } else {
+                            qe = queue.take();
+                        }
+
                         dequeueStartTime = MathUtils.nowInNano();
                         busyStartTime = dequeueStartTime;
                         journalStats.getJournalQueueSize().dec();
                         journalStats.getJournalQueueStats()
-                            
.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime), 
TimeUnit.NANOSECONDS);
+                                
.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime),
+                                        TimeUnit.NANOSECONDS);
                     } else {
-                        long pollWaitTimeNanos = maxGroupWaitInNanos
-                                - 
MathUtils.elapsedNanos(toFlush.get(0).enqueueTime);
-                        if (flushWhenQueueEmpty || pollWaitTimeNanos < 0) {
-                            pollWaitTimeNanos = 0;
+                        if (localQueueEntries.isEmpty()) {
+                            queue.drainTo(localQueueEntries);
+                        }
+
+                        if (!localQueueEntries.isEmpty()) {
+                            
journalTime.addLatency(MathUtils.elapsedNanos(busyStartTime), 
TimeUnit.NANOSECONDS);
+                            qe = localQueueEntries.removeFirst();
+                            dequeueStartTime = MathUtils.nowInNano();
+                            busyStartTime = dequeueStartTime;
+                            journalStats.getJournalQueueSize().dec();
+                            journalStats.getJournalQueueStats()
+                                    
.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime),
+                                            TimeUnit.NANOSECONDS);
+                        } else {
+                            long pollWaitTimeNanos = maxGroupWaitInNanos
+                                    - 
MathUtils.elapsedNanos(toFlush.get(0).enqueueTime);
+                            if (flushWhenQueueEmpty || pollWaitTimeNanos < 0) {
+                                pollWaitTimeNanos = 0;
+                            }
+                            qe = queue.poll(pollWaitTimeNanos, 
TimeUnit.NANOSECONDS);
+                            dequeueStartTime = MathUtils.nowInNano();
                         }
-                        qe = queue.poll(pollWaitTimeNanos, 
TimeUnit.NANOSECONDS);
-                        dequeueStartTime = MathUtils.nowInNano();
 
                         if (qe != null) {
                             journalStats.getJournalQueueSize().dec();

Reply via email to