This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.16 by this push:
     new 2a7fae2090 Use BlockingQueue.drainTo() in JournalForceWrite thread 
(#3545)
2a7fae2090 is described below

commit 2a7fae20900626a5f841239d95f5ad1035e7cd8b
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Oct 18 17:03:26 2022 -0700

    Use BlockingQueue.drainTo() in JournalForceWrite thread (#3545)
---
 .../java/org/apache/bookkeeper/bookie/Journal.java | 149 ++++++++++++---------
 1 file changed, 86 insertions(+), 63 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 69a0ac8566..a3e0086b69 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -487,6 +487,11 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
         // should we group force writes
         private final boolean enableGroupForceWrites;
         private final Counter forceWriteThreadTime;
+
+        boolean shouldForceWrite = true;
+        int numReqInLastForceWrite = 0;
+        boolean forceWriteMarkerSent = false;
+
         public ForceWriteThread(Thread threadToNotifyOnEx,
                                 boolean enableGroupForceWrites,
                                 StatsLogger statsLogger) {
@@ -508,90 +513,108 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                 }
             }
 
-            boolean shouldForceWrite = true;
-            int numReqInLastForceWrite = 0;
             long busyStartTime = System.nanoTime();
-            boolean forceWriteMarkerSent = false;
+
+            List<ForceWriteRequest> localRequests = new ArrayList<>();
+
             while (running) {
-                ForceWriteRequest req = null;
                 try {
-                    
forceWriteThreadTime.addLatency(MathUtils.elapsedNanos(busyStartTime), 
TimeUnit.NANOSECONDS);
-                    req = forceWriteRequests.take();
-                    busyStartTime = System.nanoTime();
-                    // Force write the file and then notify the write 
completions
-                    //
-                    if (!req.isMarker) {
-                        if (shouldForceWrite) {
-                            // if we are going to force write, any request 
that is already in the
-                            // queue will benefit from this force write - post 
a marker prior to issuing
-                            // the flush so until this marker is encountered 
we can skip the force write
-                            if (enableGroupForceWrites) {
-                                ForceWriteRequest marker =
-                                    createForceWriteRequest(req.logFile, 0, 0, 
null, false, true);
-                                forceWriteMarkerSent = 
forceWriteRequests.offer(marker);
-                                if (!forceWriteMarkerSent) {
-                                    marker.recycle();
-                                    Counter failures = 
journalStats.getForceWriteGroupingFailures();
-                                    failures.inc();
-                                    LOG.error(
-                                        "Fail to send force write grouping 
marker,"
-                                        + " Journal.forceWriteRequests 
queue(capacity {}) is full,"
-                                        + " current failure counter is {}.",
-                                        conf.getJournalQueueSize(), 
failures.get());
-                                }
-                            }
 
-                            // If we are about to issue a write, record the 
number of requests in
-                            // the last force write and then reset the counter 
so we can accumulate
-                            // requests in the write we are about to issue
-                            if (numReqInLastForceWrite > 0) {
-                                journalStats.getForceWriteGroupingCountStats()
-                                    
.registerSuccessfulValue(numReqInLastForceWrite);
-                                numReqInLastForceWrite = 0;
-                            }
-                        }
+                    int requestsCount = 
forceWriteRequests.drainTo(localRequests);
+                    if (requestsCount == 0) {
+                        ForceWriteRequest fwr = forceWriteRequests.take();
+                        localRequests.add(fwr);
+                        requestsCount = 1;
                     }
-                    numReqInLastForceWrite += req.process(shouldForceWrite);
-
-                    if (enableGroupForceWrites
-                            // if its a marker we should switch back to 
flushing
-                            && !req.isMarker
-                            // If group marker sending failed, we can't figure 
out which writes are
-                            // grouped in this force write. So, abandon it 
even if other writes could
-                            // be grouped. This should be extremely rare as, 
usually, queue size is
-                            // large enough to accommodate high flush 
frequencies.
-                            && forceWriteMarkerSent
-                            // This indicates that this is the last request in 
a given file
-                            // so subsequent requests will go to a different 
file so we should
-                            // flush on the next request
-                            && !req.shouldClose) {
-                        shouldForceWrite = false;
-                    } else {
-                        shouldForceWrite = true;
+
+                    for (int i = 0; i < requestsCount; i++) {
+                        
forceWriteThreadTime.addLatency(MathUtils.elapsedNanos(busyStartTime), 
TimeUnit.NANOSECONDS);
+                        processForceWriteRequest(localRequests.get(i));
+                        busyStartTime = System.nanoTime();
                     }
-                } catch (IOException ioe) {
-                    LOG.error("I/O exception in ForceWrite thread", ioe);
-                    running = false;
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     LOG.info("ForceWrite thread interrupted");
                     // close is idempotent
-                    if (null != req) {
+                    if (!localRequests.isEmpty()) {
+                        ForceWriteRequest req = 
localRequests.get(localRequests.size() - 1);
                         req.shouldClose = true;
                         req.closeFileIfNecessary();
                     }
                     running = false;
-                } finally {
-                    if (req != null) {
-                        req.recycle();
-                    }
                 }
+
+                localRequests.clear();
             }
             // Regardless of what caused us to exit, we should notify the
             // the parent thread as it should either exit or be in the process
             // of exiting else we will have write requests hang
             threadToNotifyOnEx.interrupt();
         }
+
+        private void processForceWriteRequest(ForceWriteRequest req) {
+            try {
+                // Force write the file and then notify the write completions
+                //
+                if (!req.isMarker) {
+                    if (shouldForceWrite) {
+                        // if we are going to force write, any request that is 
already in the
+                        // queue will benefit from this force write - post a 
marker prior to issuing
+                        // the flush so until this marker is encountered we 
can skip the force write
+                        if (enableGroupForceWrites) {
+                            ForceWriteRequest marker =
+                                    createForceWriteRequest(req.logFile, 0, 0, 
null, false, true);
+                            forceWriteMarkerSent = 
forceWriteRequests.offer(marker);
+                            if (!forceWriteMarkerSent) {
+                                marker.recycle();
+                                Counter failures = 
journalStats.getForceWriteGroupingFailures();
+                                failures.inc();
+                                LOG.error(
+                                        "Fail to send force write grouping 
marker,"
+                                                + " Journal.forceWriteRequests 
queue(capacity {}) is full,"
+                                                + " current failure counter is 
{}.",
+                                        conf.getJournalQueueSize(), 
failures.get());
+                            }
+                        }
+
+                        // If we are about to issue a write, record the number 
of requests in
+                        // the last force write and then reset the counter so 
we can accumulate
+                        // requests in the write we are about to issue
+                        if (numReqInLastForceWrite > 0) {
+                            journalStats.getForceWriteGroupingCountStats()
+                                    
.registerSuccessfulValue(numReqInLastForceWrite);
+                            numReqInLastForceWrite = 0;
+                        }
+                    }
+                }
+                numReqInLastForceWrite += req.process(shouldForceWrite);
+
+                if (enableGroupForceWrites
+                        // if its a marker we should switch back to flushing
+                        && !req.isMarker
+                        // If group marker sending failed, we can't figure out 
which writes are
+                        // grouped in this force write. So, abandon it even if 
other writes could
+                        // be grouped. This should be extremely rare as, 
usually, queue size is
+                        // large enough to accommodate high flush frequencies.
+                        && forceWriteMarkerSent
+                        // This indicates that this is the last request in a 
given file
+                        // so subsequent requests will go to a different file 
so we should
+                        // flush on the next request
+                        && !req.shouldClose) {
+                    shouldForceWrite = false;
+                } else {
+                    shouldForceWrite = true;
+                }
+             } catch (IOException ioe) {
+                LOG.error("I/O exception in ForceWrite thread", ioe);
+                running = false;
+            } finally {
+                if (req != null) {
+                    req.recycle();
+                }
+            }
+        }
+
         // shutdown sync thread
         void shutdown() throws InterruptedException {
             running = false;

Reply via email to