This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new b63cca8bbb Use BlockingQueue.drainTo() in JournalForceWrite thread
(#3545)
b63cca8bbb is described below
commit b63cca8bbb0208afa235abfdfc5f8dafd9eda2dd
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Oct 18 17:03:26 2022 -0700
Use BlockingQueue.drainTo() in JournalForceWrite thread (#3545)
---
.../java/org/apache/bookkeeper/bookie/Journal.java | 149 ++++++++++++---------
1 file changed, 86 insertions(+), 63 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 69a0ac8566..a3e0086b69 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -487,6 +487,11 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
// should we group force writes
private final boolean enableGroupForceWrites;
private final Counter forceWriteThreadTime;
+
+ boolean shouldForceWrite = true;
+ int numReqInLastForceWrite = 0;
+ boolean forceWriteMarkerSent = false;
+
public ForceWriteThread(Thread threadToNotifyOnEx,
boolean enableGroupForceWrites,
StatsLogger statsLogger) {
@@ -508,90 +513,108 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
}
}
- boolean shouldForceWrite = true;
- int numReqInLastForceWrite = 0;
long busyStartTime = System.nanoTime();
- boolean forceWriteMarkerSent = false;
+
+ List<ForceWriteRequest> localRequests = new ArrayList<>();
+
while (running) {
- ForceWriteRequest req = null;
try {
-
forceWriteThreadTime.addLatency(MathUtils.elapsedNanos(busyStartTime),
TimeUnit.NANOSECONDS);
- req = forceWriteRequests.take();
- busyStartTime = System.nanoTime();
- // Force write the file and then notify the write
completions
- //
- if (!req.isMarker) {
- if (shouldForceWrite) {
- // if we are going to force write, any request
that is already in the
- // queue will benefit from this force write - post
a marker prior to issuing
- // the flush so until this marker is encountered
we can skip the force write
- if (enableGroupForceWrites) {
- ForceWriteRequest marker =
- createForceWriteRequest(req.logFile, 0, 0,
null, false, true);
- forceWriteMarkerSent =
forceWriteRequests.offer(marker);
- if (!forceWriteMarkerSent) {
- marker.recycle();
- Counter failures =
journalStats.getForceWriteGroupingFailures();
- failures.inc();
- LOG.error(
- "Fail to send force write grouping
marker,"
- + " Journal.forceWriteRequests
queue(capacity {}) is full,"
- + " current failure counter is {}.",
- conf.getJournalQueueSize(),
failures.get());
- }
- }
- // If we are about to issue a write, record the
number of requests in
- // the last force write and then reset the counter
so we can accumulate
- // requests in the write we are about to issue
- if (numReqInLastForceWrite > 0) {
- journalStats.getForceWriteGroupingCountStats()
-
.registerSuccessfulValue(numReqInLastForceWrite);
- numReqInLastForceWrite = 0;
- }
- }
+ int requestsCount =
forceWriteRequests.drainTo(localRequests);
+ if (requestsCount == 0) {
+ ForceWriteRequest fwr = forceWriteRequests.take();
+ localRequests.add(fwr);
+ requestsCount = 1;
}
- numReqInLastForceWrite += req.process(shouldForceWrite);
-
- if (enableGroupForceWrites
- // if its a marker we should switch back to
flushing
- && !req.isMarker
- // If group marker sending failed, we can't figure
out which writes are
- // grouped in this force write. So, abandon it
even if other writes could
- // be grouped. This should be extremely rare as,
usually, queue size is
- // large enough to accommodate high flush
frequencies.
- && forceWriteMarkerSent
- // This indicates that this is the last request in
a given file
- // so subsequent requests will go to a different
file so we should
- // flush on the next request
- && !req.shouldClose) {
- shouldForceWrite = false;
- } else {
- shouldForceWrite = true;
+
+ for (int i = 0; i < requestsCount; i++) {
+
forceWriteThreadTime.addLatency(MathUtils.elapsedNanos(busyStartTime),
TimeUnit.NANOSECONDS);
+ processForceWriteRequest(localRequests.get(i));
+ busyStartTime = System.nanoTime();
}
- } catch (IOException ioe) {
- LOG.error("I/O exception in ForceWrite thread", ioe);
- running = false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info("ForceWrite thread interrupted");
// close is idempotent
- if (null != req) {
+ if (!localRequests.isEmpty()) {
+ ForceWriteRequest req =
localRequests.get(localRequests.size() - 1);
req.shouldClose = true;
req.closeFileIfNecessary();
}
running = false;
- } finally {
- if (req != null) {
- req.recycle();
- }
}
+
+ localRequests.clear();
}
// Regardless of what caused us to exit, we should notify the
// the parent thread as it should either exit or be in the process
// of exiting else we will have write requests hang
threadToNotifyOnEx.interrupt();
}
+
+ private void processForceWriteRequest(ForceWriteRequest req) {
+ try {
+ // Force write the file and then notify the write completions
+ //
+ if (!req.isMarker) {
+ if (shouldForceWrite) {
+ // if we are going to force write, any request that is
already in the
+ // queue will benefit from this force write - post a
marker prior to issuing
+ // the flush so until this marker is encountered we
can skip the force write
+ if (enableGroupForceWrites) {
+ ForceWriteRequest marker =
+ createForceWriteRequest(req.logFile, 0, 0,
null, false, true);
+ forceWriteMarkerSent =
forceWriteRequests.offer(marker);
+ if (!forceWriteMarkerSent) {
+ marker.recycle();
+ Counter failures =
journalStats.getForceWriteGroupingFailures();
+ failures.inc();
+ LOG.error(
+ "Fail to send force write grouping
marker,"
+ + " Journal.forceWriteRequests
queue(capacity {}) is full,"
+ + " current failure counter is
{}.",
+ conf.getJournalQueueSize(),
failures.get());
+ }
+ }
+
+ // If we are about to issue a write, record the number
of requests in
+ // the last force write and then reset the counter so
we can accumulate
+ // requests in the write we are about to issue
+ if (numReqInLastForceWrite > 0) {
+ journalStats.getForceWriteGroupingCountStats()
+
.registerSuccessfulValue(numReqInLastForceWrite);
+ numReqInLastForceWrite = 0;
+ }
+ }
+ }
+ numReqInLastForceWrite += req.process(shouldForceWrite);
+
+ if (enableGroupForceWrites
+ // if its a marker we should switch back to flushing
+ && !req.isMarker
+ // If group marker sending failed, we can't figure out
which writes are
+ // grouped in this force write. So, abandon it even if
other writes could
+ // be grouped. This should be extremely rare as,
usually, queue size is
+ // large enough to accommodate high flush frequencies.
+ && forceWriteMarkerSent
+ // This indicates that this is the last request in a
given file
+ // so subsequent requests will go to a different file
so we should
+ // flush on the next request
+ && !req.shouldClose) {
+ shouldForceWrite = false;
+ } else {
+ shouldForceWrite = true;
+ }
+ } catch (IOException ioe) {
+ LOG.error("I/O exception in ForceWrite thread", ioe);
+ running = false;
+ } finally {
+ if (req != null) {
+ req.recycle();
+ }
+ }
+ }
+
// shutdown sync thread
void shutdown() throws InterruptedException {
running = false;