This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 16a4365d7d Use BlockingQueue.drainTo() in Journal thread (#3544)
16a4365d7d is described below
commit 16a4365d7dcaafb2b0fc5ae6d43b4bee0af78b98
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Oct 20 08:12:03 2022 -0700
Use BlockingQueue.drainTo() in Journal thread (#3544)
---
.../java/org/apache/bookkeeper/bookie/Journal.java | 44 +++++++++++++++++-----
1 file changed, 35 insertions(+), 9 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index a3e0086b69..d28c1cac09 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -36,6 +36,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -1071,6 +1072,8 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
long lastFlushTimeMs = System.currentTimeMillis();
long busyStartTime = System.nanoTime();
+ ArrayDeque<QueueEntry> localQueueEntries = new ArrayDeque<>();
+
QueueEntry qe = null;
while (true) {
// new journal file to write
@@ -1103,22 +1106,45 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
.registerSuccessfulEvent(MathUtils.elapsedNanos(dequeueStartTime),
TimeUnit.NANOSECONDS);
}
- if (numEntriesToFlush == 0) {
+ if (numEntriesToFlush == 0 && localQueueEntries.isEmpty())
{
+ queue.drainTo(localQueueEntries);
+
journalTime.addLatency(MathUtils.elapsedNanos(busyStartTime),
TimeUnit.NANOSECONDS);
- qe = queue.take();
+ if (!localQueueEntries.isEmpty()) {
+ qe = localQueueEntries.removeFirst();
+ } else {
+ qe = queue.take();
+ }
+
dequeueStartTime = MathUtils.nowInNano();
busyStartTime = dequeueStartTime;
journalStats.getJournalQueueSize().dec();
journalStats.getJournalQueueStats()
-
.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime),
TimeUnit.NANOSECONDS);
+
.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime),
+ TimeUnit.NANOSECONDS);
} else {
- long pollWaitTimeNanos = maxGroupWaitInNanos
- -
MathUtils.elapsedNanos(toFlush.get(0).enqueueTime);
- if (flushWhenQueueEmpty || pollWaitTimeNanos < 0) {
- pollWaitTimeNanos = 0;
+ if (localQueueEntries.isEmpty()) {
+ queue.drainTo(localQueueEntries);
+ }
+
+ if (!localQueueEntries.isEmpty()) {
+
journalTime.addLatency(MathUtils.elapsedNanos(busyStartTime),
TimeUnit.NANOSECONDS);
+ qe = localQueueEntries.removeFirst();
+ dequeueStartTime = MathUtils.nowInNano();
+ busyStartTime = dequeueStartTime;
+ journalStats.getJournalQueueSize().dec();
+ journalStats.getJournalQueueStats()
+
.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime),
+ TimeUnit.NANOSECONDS);
+ } else {
+ long pollWaitTimeNanos = maxGroupWaitInNanos
+ -
MathUtils.elapsedNanos(toFlush.get(0).enqueueTime);
+ if (flushWhenQueueEmpty || pollWaitTimeNanos < 0) {
+ pollWaitTimeNanos = 0;
+ }
+ qe = queue.poll(pollWaitTimeNanos,
TimeUnit.NANOSECONDS);
+ dequeueStartTime = MathUtils.nowInNano();
}
- qe = queue.poll(pollWaitTimeNanos,
TimeUnit.NANOSECONDS);
- dequeueStartTime = MathUtils.nowInNano();
if (qe != null) {
journalStats.getJournalQueueSize().dec();