This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 39a539e Removed all per-entry allocations in Journal operations
39a539e is described below
commit 39a539e5c5a3381cad9e5e0da334c7f359e17c3b
Author: Ivan Kelly <[email protected]>
AuthorDate: Sun Dec 3 23:26:03 2017 -0800
Removed all per-entry allocations in Journal operations
This change was originally b2cc158a from the yahoo-4.3 branch.
Author: Ivan Kelly <[email protected]>
Author: Matteo Merli <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>, Sijie
Guo <[email protected]>
This closes #798 from ivankelly/yahoo-bp-5
---
.../java/org/apache/bookkeeper/bookie/Journal.java | 168 ++++++++++++++-------
.../util/collections/RecyclableArrayList.java | 54 +++++++
2 files changed, 171 insertions(+), 51 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 1a3635c..d2dcf0e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -24,6 +24,9 @@ package org.apache.bookkeeper.bookie;
import com.google.common.base.Stopwatch;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -31,11 +34,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -48,6 +50,8 @@ import org.apache.bookkeeper.util.DaemonThreadFactory;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.ZeroBuffer;
+import org.apache.bookkeeper.util.collections.GrowableArrayBlockingQueue;
+import org.apache.bookkeeper.util.collections.RecyclableArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +62,11 @@ public class Journal extends BookieCriticalThread implements
CheckpointSource {
private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
+ private static final RecyclableArrayList.Recycler<QueueEntry>
entryListRecycler
+ = new RecyclableArrayList.Recycler<QueueEntry>();
+ private static final RecyclableArrayList<QueueEntry> EMPTY_ARRAY_LIST
+ = entryListRecycler.newInstance();
+
/**
* Filter to pickup journals.
*/
@@ -270,7 +279,7 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
/**
* Journal Entry to Record.
*/
- private class QueueEntry implements Runnable {
+ private static class QueueEntry implements Runnable {
ByteBuf entry;
long ledgerId;
long entryId;
@@ -278,13 +287,19 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
Object ctx;
long enqueueTime;
- QueueEntry(ByteBuf entry, long ledgerId, long entryId, WriteCallback
cb, Object ctx, long enqueueTime) {
- this.entry = entry.duplicate();
- this.cb = cb;
- this.ctx = ctx;
- this.ledgerId = ledgerId;
- this.entryId = entryId;
- this.enqueueTime = enqueueTime;
+ OpStatsLogger journalAddEntryStats;
+
+ static QueueEntry create(ByteBuf entry, long ledgerId, long entryId,
WriteCallback cb, Object ctx,
+ long enqueueTime, OpStatsLogger journalAddEntryStats) {
+ QueueEntry qe = RECYCLER.get();
+ qe.entry = entry.duplicate();
+ qe.cb = cb;
+ qe.ctx = ctx;
+ qe.ledgerId = ledgerId;
+ qe.entryId = entryId;
+ qe.enqueueTime = enqueueTime;
+ qe.journalAddEntryStats = journalAddEntryStats;
+ return qe;
}
@Override
@@ -294,31 +309,33 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
}
journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime),
TimeUnit.NANOSECONDS);
cb.writeComplete(0, ledgerId, entryId, null, ctx);
+ recycle();
+ }
+
+ private final Handle<QueueEntry> recyclerHandle;
+
+ private QueueEntry(Handle<QueueEntry> recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private static final Recycler<QueueEntry> RECYCLER = new
Recycler<QueueEntry>() {
+ protected QueueEntry newObject(Recycler.Handle<QueueEntry> handle)
{
+ return new QueueEntry(handle);
+ }
+ };
+
+ private void recycle() {
+ recyclerHandle.recycle(this);
}
}
private class ForceWriteRequest {
- private final JournalChannel logFile;
- private final LinkedList<QueueEntry> forceWriteWaiters;
+ private JournalChannel logFile;
+ private RecyclableArrayList<QueueEntry> forceWriteWaiters;
private boolean shouldClose;
- private final boolean isMarker;
- private final long lastFlushedPosition;
- private final long logId;
-
- private ForceWriteRequest(JournalChannel logFile,
- long logId,
- long lastFlushedPosition,
- LinkedList<QueueEntry> forceWriteWaiters,
- boolean shouldClose,
- boolean isMarker) {
- this.forceWriteWaiters = forceWriteWaiters;
- this.logFile = logFile;
- this.logId = logId;
- this.lastFlushedPosition = lastFlushedPosition;
- this.shouldClose = shouldClose;
- this.isMarker = isMarker;
- forceWriteQueueSize.inc();
- }
+ private boolean isMarker;
+ private long lastFlushedPosition;
+ private long logId;
public int process(boolean shouldForceWrite) throws IOException {
forceWriteQueueSize.dec();
@@ -335,11 +352,11 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
lastLogMark.setCurLogMark(this.logId,
this.lastFlushedPosition);
// Notify the waiters that the force write succeeded
- for (QueueEntry e : this.forceWriteWaiters) {
- cbThreadPool.execute(e);
+ for (int i = 0; i < forceWriteWaiters.size(); i++) {
+ cbThreadPool.execute(forceWriteWaiters.get(i));
}
- return this.forceWriteWaiters.size();
+ return forceWriteWaiters.size();
} finally {
closeFileIfNecessary();
}
@@ -359,8 +376,48 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
}
}
}
+
+ private final Handle<ForceWriteRequest> recyclerHandle;
+
+ private ForceWriteRequest(Handle<ForceWriteRequest> recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private void recycle() {
+ logFile = null;
+ if (forceWriteWaiters != null) {
+ forceWriteWaiters.recycle();
+ forceWriteWaiters = null;
+ }
+ recyclerHandle.recycle(this);
+ }
+ }
+
+ private ForceWriteRequest createForceWriteRequest(JournalChannel logFile,
+ long logId,
+ long lastFlushedPosition,
+ RecyclableArrayList<QueueEntry> forceWriteWaiters,
+ boolean shouldClose,
+ boolean isMarker) {
+ ForceWriteRequest req = forceWriteRequestsRecycler.get();
+ req.forceWriteWaiters = forceWriteWaiters;
+ req.logFile = logFile;
+ req.logId = logId;
+ req.lastFlushedPosition = lastFlushedPosition;
+ req.shouldClose = shouldClose;
+ req.isMarker = isMarker;
+ forceWriteQueueSize.inc();
+ return req;
}
+ private final Recycler<ForceWriteRequest> forceWriteRequestsRecycler
+ = new Recycler<ForceWriteRequest>() {
+ protected ForceWriteRequest newObject(
+ Recycler.Handle<ForceWriteRequest> handle) {
+ return new ForceWriteRequest(handle);
+ }
+ };
+
/**
* ForceWriteThread is a background thread which makes the journal durable
periodically.
*
@@ -396,7 +453,7 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
// queue will benefit from this force write - post
a marker prior to issuing
// the flush so until this marker is encountered
we can skip the force write
if (enableGroupForceWrites) {
- forceWriteRequests.put(new
ForceWriteRequest(req.logFile, 0, 0, null, false, true));
+
forceWriteRequests.put(createForceWriteRequest(req.logFile, 0, 0, null, false,
true));
}
// If we are about to issue a write, record the
number of requests in
@@ -432,6 +489,10 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
req.closeFileIfNecessary();
}
running = false;
+ } finally {
+ if (req != null) {
+ req.recycle();
+ }
}
}
// Regardless of what caused us to exit, we should notify the
@@ -510,8 +571,8 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
private final ExecutorService cbThreadPool;
// journal entry queue to commit
- final LinkedBlockingQueue<QueueEntry> queue = new
LinkedBlockingQueue<QueueEntry>();
- final LinkedBlockingQueue<ForceWriteRequest> forceWriteRequests = new
LinkedBlockingQueue<ForceWriteRequest>();
+ final BlockingQueue<QueueEntry> queue = new
GrowableArrayBlockingQueue<QueueEntry>();
+ final BlockingQueue<ForceWriteRequest> forceWriteRequests = new
GrowableArrayBlockingQueue<ForceWriteRequest>();
volatile boolean running = true;
private final LedgerDirsManager ledgerDirsManager;
@@ -769,7 +830,7 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
//Retain entry until it gets written to journal
entry.retain();
- queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx,
MathUtils.nowInNano()));
+ queue.add(QueueEntry.create(entry, ledgerId, entryId, cb, ctx,
MathUtils.nowInNano(), journalAddEntryStats));
}
/**
@@ -799,7 +860,8 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
@Override
public void run() {
LOG.info("Starting journal on {}", journalDirectory);
- LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>();
+
+ RecyclableArrayList<QueueEntry> toFlush =
entryListRecycler.newInstance();
ByteBuffer lenBuff = ByteBuffer.allocate(4);
ByteBuffer paddingBuff = ByteBuffer.allocate(2 *
conf.getJournalAlignmentSize());
ZeroBuffer.put(paddingBuff);
@@ -851,8 +913,8 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
journalQueueStats.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime),
TimeUnit.NANOSECONDS);
} else {
- long pollWaitTimeNanos =
- maxGroupWaitInNanos -
MathUtils.elapsedNanos(toFlush.get(0).enqueueTime);
+ long pollWaitTimeNanos = maxGroupWaitInNanos
+ -
MathUtils.elapsedNanos(toFlush.get(0).enqueueTime);
if (flushWhenQueueEmpty || pollWaitTimeNanos < 0) {
pollWaitTimeNanos = 0;
}
@@ -867,8 +929,8 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
boolean shouldFlush = false;
// We should issue a forceWrite if any of the three
conditions below holds good
// 1. If the oldest pending entry has been pending for
longer than the max wait time
- if (maxGroupWaitInNanos > 0 && !groupWhenTimeout
- &&
(MathUtils.elapsedNanos(toFlush.get(0).enqueueTime) > maxGroupWaitInNanos)) {
+ if (maxGroupWaitInNanos > 0 && !groupWhenTimeout &&
(MathUtils
+ .elapsedNanos(toFlush.get(0).enqueueTime) >
maxGroupWaitInNanos)) {
groupWhenTimeout = true;
} else if (maxGroupWaitInNanos > 0 && groupWhenTimeout
&& qe != null
&& MathUtils.elapsedNanos(qe.enqueueTime) <
maxGroupWaitInNanos) {
@@ -910,8 +972,8 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
// Trace the lifetime of entries through
persistence
if (LOG.isDebugEnabled()) {
for (QueueEntry e : toFlush) {
- LOG.debug("Written and queuing for flush
Ledger:" + e.ledgerId + " Entry:"
- + e.entryId);
+ LOG.debug("Written and queuing for flush
Ledger: {} Entry: {}",
+ e.ledgerId, e.entryId);
}
}
@@ -920,26 +982,30 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
boolean shouldRolloverJournal = (lastFlushPosition
> maxJournalSize);
if (syncData) {
- // Trigger data sync to disk in the
"Force-Write" thread. Callback will be triggered after data is committed to disk
- forceWriteRequests.put(new
ForceWriteRequest(logFile, logId, lastFlushPosition, toFlush,
shouldRolloverJournal, false));
- toFlush = new LinkedList<QueueEntry>();
+ // Trigger data sync to disk in the
"Force-Write" thread.
+ // Callback will be triggered after data is
committed to disk
+
forceWriteRequests.put(createForceWriteRequest(logFile, logId,
lastFlushPosition,
+
toFlush, shouldRolloverJournal, false));
+ toFlush = entryListRecycler.newInstance();
} else {
// Data is already written on the file (though
it might still be in the OS page-cache)
lastLogMark.setCurLogMark(logId,
lastFlushPosition);
for (int i = 0; i < toFlush.size(); i++) {
- cbThreadPool.execute((QueueEntry)
toFlush.get(i));
+ cbThreadPool.execute(toFlush.get(i));
}
toFlush.clear();
if (shouldRolloverJournal) {
- forceWriteRequests.put(new
ForceWriteRequest(logFile, logId, lastFlushPosition,
- new LinkedList<>(),
shouldRolloverJournal, false));
+ forceWriteRequests.put(
+ createForceWriteRequest(
+ logFile, logId,
lastFlushPosition,
+ EMPTY_ARRAY_LIST,
shouldRolloverJournal, false));
}
}
batchSize = 0L;
// check whether journal file is over file limit
- if (bc.position() > maxJournalSize) {
+ if (shouldRolloverJournal) {
logFile = null;
continue;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/RecyclableArrayList.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/RecyclableArrayList.java
new file mode 100644
index 0000000..1febc0c
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/RecyclableArrayList.java
@@ -0,0 +1,54 @@
+// Originally copied from netty project, version 4.1.17-Final, heavily modified
+/*
+ * Copyright 2013 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.util.collections;
+
+import io.netty.util.Recycler.Handle;
+import java.util.ArrayList;
+
+/**
+ * A simple list which is recyclable.
+ */
+public final class RecyclableArrayList<T> extends ArrayList<T> {
+
+ private static final int DEFAULT_INITIAL_CAPACITY = 8;
+
+ public static class Recycler<X>
+ extends io.netty.util.Recycler<RecyclableArrayList<X>> {
+ @Override
+ protected RecyclableArrayList<X> newObject(
+ Handle<RecyclableArrayList<X>> handle) {
+ return new RecyclableArrayList<X>(handle,
DEFAULT_INITIAL_CAPACITY);
+ }
+
+ public RecyclableArrayList<X> newInstance() {
+ return get();
+ }
+ }
+
+ private final Handle<RecyclableArrayList<T>> handle;
+
+ private RecyclableArrayList(Handle<RecyclableArrayList<T>> handle, int
initialCapacity) {
+ super(initialCapacity);
+ this.handle = handle;
+ }
+
+ public void recycle() {
+ clear();
+ handle.recycle(this);
+ }
+}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].