This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 39a539e  Removed all per-entry allocations in Journal operations
39a539e is described below

commit 39a539e5c5a3381cad9e5e0da334c7f359e17c3b
Author: Ivan Kelly <[email protected]>
AuthorDate: Sun Dec 3 23:26:03 2017 -0800

    Removed all per-entry allocations in Journal operations
    
    This change was originally b2cc158a from the yahoo-4.3 branch.
    
    Author: Ivan Kelly <[email protected]>
    Author: Matteo Merli <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>, Sijie 
Guo <[email protected]>
    
    This closes #798 from ivankelly/yahoo-bp-5
---
 .../java/org/apache/bookkeeper/bookie/Journal.java | 168 ++++++++++++++-------
 .../util/collections/RecyclableArrayList.java      |  54 +++++++
 2 files changed, 171 insertions(+), 51 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 1a3635c..d2dcf0e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -24,6 +24,9 @@ package org.apache.bookkeeper.bookie;
 import com.google.common.base.Stopwatch;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -31,11 +34,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import 
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -48,6 +50,8 @@ import org.apache.bookkeeper.util.DaemonThreadFactory;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.ZeroBuffer;
+import org.apache.bookkeeper.util.collections.GrowableArrayBlockingQueue;
+import org.apache.bookkeeper.util.collections.RecyclableArrayList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,6 +62,11 @@ public class Journal extends BookieCriticalThread implements 
CheckpointSource {
 
     private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
 
+    private static final RecyclableArrayList.Recycler<QueueEntry> 
entryListRecycler
+        = new RecyclableArrayList.Recycler<QueueEntry>();
+    private static final RecyclableArrayList<QueueEntry> EMPTY_ARRAY_LIST
+        = entryListRecycler.newInstance();
+
     /**
      * Filter to pickup journals.
      */
@@ -270,7 +279,7 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
     /**
      * Journal Entry to Record.
      */
-    private class QueueEntry implements Runnable {
+    private static class QueueEntry implements Runnable {
         ByteBuf entry;
         long ledgerId;
         long entryId;
@@ -278,13 +287,19 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
         Object ctx;
         long enqueueTime;
 
-        QueueEntry(ByteBuf entry, long ledgerId, long entryId, WriteCallback 
cb, Object ctx, long enqueueTime) {
-            this.entry = entry.duplicate();
-            this.cb = cb;
-            this.ctx = ctx;
-            this.ledgerId = ledgerId;
-            this.entryId = entryId;
-            this.enqueueTime = enqueueTime;
+        OpStatsLogger journalAddEntryStats;
+
+        static QueueEntry create(ByteBuf entry, long ledgerId, long entryId, 
WriteCallback cb, Object ctx,
+                long enqueueTime, OpStatsLogger journalAddEntryStats) {
+            QueueEntry qe = RECYCLER.get();
+            qe.entry = entry.duplicate();
+            qe.cb = cb;
+            qe.ctx = ctx;
+            qe.ledgerId = ledgerId;
+            qe.entryId = entryId;
+            qe.enqueueTime = enqueueTime;
+            qe.journalAddEntryStats = journalAddEntryStats;
+            return qe;
         }
 
         @Override
@@ -294,31 +309,33 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
             }
             
journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueTime),
 TimeUnit.NANOSECONDS);
             cb.writeComplete(0, ledgerId, entryId, null, ctx);
+            recycle();
+        }
+
+        private final Handle<QueueEntry> recyclerHandle;
+
+        private QueueEntry(Handle<QueueEntry> recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        private static final Recycler<QueueEntry> RECYCLER = new 
Recycler<QueueEntry>() {
+            protected QueueEntry newObject(Recycler.Handle<QueueEntry> handle) 
{
+                return new QueueEntry(handle);
+            }
+        };
+
+        private void recycle() {
+            recyclerHandle.recycle(this);
         }
     }
 
     private class ForceWriteRequest {
-        private final JournalChannel logFile;
-        private final LinkedList<QueueEntry> forceWriteWaiters;
+        private JournalChannel logFile;
+        private RecyclableArrayList<QueueEntry> forceWriteWaiters;
         private boolean shouldClose;
-        private final boolean isMarker;
-        private final long lastFlushedPosition;
-        private final long logId;
-
-        private ForceWriteRequest(JournalChannel logFile,
-                          long logId,
-                          long lastFlushedPosition,
-                          LinkedList<QueueEntry> forceWriteWaiters,
-                          boolean shouldClose,
-                          boolean isMarker) {
-            this.forceWriteWaiters = forceWriteWaiters;
-            this.logFile = logFile;
-            this.logId = logId;
-            this.lastFlushedPosition = lastFlushedPosition;
-            this.shouldClose = shouldClose;
-            this.isMarker = isMarker;
-            forceWriteQueueSize.inc();
-        }
+        private boolean isMarker;
+        private long lastFlushedPosition;
+        private long logId;
 
         public int process(boolean shouldForceWrite) throws IOException {
             forceWriteQueueSize.dec();
@@ -335,11 +352,11 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                 lastLogMark.setCurLogMark(this.logId, 
this.lastFlushedPosition);
 
                 // Notify the waiters that the force write succeeded
-                for (QueueEntry e : this.forceWriteWaiters) {
-                    cbThreadPool.execute(e);
+                for (int i = 0; i < forceWriteWaiters.size(); i++) {
+                    cbThreadPool.execute(forceWriteWaiters.get(i));
                 }
 
-                return this.forceWriteWaiters.size();
+                return forceWriteWaiters.size();
             } finally {
                 closeFileIfNecessary();
             }
@@ -359,8 +376,48 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                 }
             }
         }
+
+        private final Handle<ForceWriteRequest> recyclerHandle;
+
+        private ForceWriteRequest(Handle<ForceWriteRequest> recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        private void recycle() {
+            logFile = null;
+            if (forceWriteWaiters != null) {
+                forceWriteWaiters.recycle();
+                forceWriteWaiters = null;
+            }
+            recyclerHandle.recycle(this);
+        }
+    }
+
+    private ForceWriteRequest createForceWriteRequest(JournalChannel logFile,
+                          long logId,
+                          long lastFlushedPosition,
+                          RecyclableArrayList<QueueEntry> forceWriteWaiters,
+                          boolean shouldClose,
+                          boolean isMarker) {
+        ForceWriteRequest req = forceWriteRequestsRecycler.get();
+        req.forceWriteWaiters = forceWriteWaiters;
+        req.logFile = logFile;
+        req.logId = logId;
+        req.lastFlushedPosition = lastFlushedPosition;
+        req.shouldClose = shouldClose;
+        req.isMarker = isMarker;
+        forceWriteQueueSize.inc();
+        return req;
     }
 
+    private final Recycler<ForceWriteRequest> forceWriteRequestsRecycler
+        = new Recycler<ForceWriteRequest>() {
+                protected ForceWriteRequest newObject(
+                        Recycler.Handle<ForceWriteRequest> handle) {
+                    return new ForceWriteRequest(handle);
+                }
+            };
+
     /**
      * ForceWriteThread is a background thread which makes the journal durable 
periodically.
      *
@@ -396,7 +453,7 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                             // queue will benefit from this force write - post 
a marker prior to issuing
                             // the flush so until this marker is encountered 
we can skip the force write
                             if (enableGroupForceWrites) {
-                                forceWriteRequests.put(new 
ForceWriteRequest(req.logFile, 0, 0, null, false, true));
+                                
forceWriteRequests.put(createForceWriteRequest(req.logFile, 0, 0, null, false, 
true));
                             }
 
                             // If we are about to issue a write, record the 
number of requests in
@@ -432,6 +489,10 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                         req.closeFileIfNecessary();
                     }
                     running = false;
+                } finally {
+                    if (req != null) {
+                        req.recycle();
+                    }
                 }
             }
             // Regardless of what caused us to exit, we should notify the
@@ -510,8 +571,8 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
     private final ExecutorService cbThreadPool;
 
     // journal entry queue to commit
-    final LinkedBlockingQueue<QueueEntry> queue = new 
LinkedBlockingQueue<QueueEntry>();
-    final LinkedBlockingQueue<ForceWriteRequest> forceWriteRequests = new 
LinkedBlockingQueue<ForceWriteRequest>();
+    final BlockingQueue<QueueEntry> queue = new 
GrowableArrayBlockingQueue<QueueEntry>();
+    final BlockingQueue<ForceWriteRequest> forceWriteRequests = new 
GrowableArrayBlockingQueue<ForceWriteRequest>();
 
     volatile boolean running = true;
     private final LedgerDirsManager ledgerDirsManager;
@@ -769,7 +830,7 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
 
         //Retain entry until it gets written to journal
         entry.retain();
-        queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx, 
MathUtils.nowInNano()));
+        queue.add(QueueEntry.create(entry, ledgerId, entryId, cb, ctx, 
MathUtils.nowInNano(), journalAddEntryStats));
     }
 
     /**
@@ -799,7 +860,8 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
     @Override
     public void run() {
         LOG.info("Starting journal on {}", journalDirectory);
-        LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>();
+
+        RecyclableArrayList<QueueEntry> toFlush = 
entryListRecycler.newInstance();
         ByteBuffer lenBuff = ByteBuffer.allocate(4);
         ByteBuffer paddingBuff = ByteBuffer.allocate(2 * 
conf.getJournalAlignmentSize());
         ZeroBuffer.put(paddingBuff);
@@ -851,8 +913,8 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                         
journalQueueStats.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime),
                                 TimeUnit.NANOSECONDS);
                     } else {
-                        long pollWaitTimeNanos =
-                                maxGroupWaitInNanos - 
MathUtils.elapsedNanos(toFlush.get(0).enqueueTime);
+                        long pollWaitTimeNanos = maxGroupWaitInNanos
+                                - 
MathUtils.elapsedNanos(toFlush.get(0).enqueueTime);
                         if (flushWhenQueueEmpty || pollWaitTimeNanos < 0) {
                             pollWaitTimeNanos = 0;
                         }
@@ -867,8 +929,8 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                         boolean shouldFlush = false;
                         // We should issue a forceWrite if any of the three 
conditions below holds good
                         // 1. If the oldest pending entry has been pending for 
longer than the max wait time
-                        if (maxGroupWaitInNanos > 0 && !groupWhenTimeout
-                                && 
(MathUtils.elapsedNanos(toFlush.get(0).enqueueTime) > maxGroupWaitInNanos)) {
+                        if (maxGroupWaitInNanos > 0 && !groupWhenTimeout && 
(MathUtils
+                                .elapsedNanos(toFlush.get(0).enqueueTime) > 
maxGroupWaitInNanos)) {
                             groupWhenTimeout = true;
                         } else if (maxGroupWaitInNanos > 0 && groupWhenTimeout 
&& qe != null
                                 && MathUtils.elapsedNanos(qe.enqueueTime) < 
maxGroupWaitInNanos) {
@@ -910,8 +972,8 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                             // Trace the lifetime of entries through 
persistence
                             if (LOG.isDebugEnabled()) {
                                 for (QueueEntry e : toFlush) {
-                                    LOG.debug("Written and queuing for flush 
Ledger:" + e.ledgerId + " Entry:"
-                                            + e.entryId);
+                                    LOG.debug("Written and queuing for flush 
Ledger: {}  Entry: {}",
+                                              e.ledgerId, e.entryId);
                                 }
                             }
 
@@ -920,26 +982,30 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
 
                             boolean shouldRolloverJournal = (lastFlushPosition 
> maxJournalSize);
                             if (syncData) {
-                                // Trigger data sync to disk in the 
"Force-Write" thread. Callback will be triggered after data is committed to disk
-                                forceWriteRequests.put(new 
ForceWriteRequest(logFile, logId, lastFlushPosition, toFlush, 
shouldRolloverJournal, false));
-                                toFlush = new LinkedList<QueueEntry>();
+                                // Trigger data sync to disk in the 
"Force-Write" thread.
+                                // Callback will be triggered after data is 
committed to disk
+                                
forceWriteRequests.put(createForceWriteRequest(logFile, logId, 
lastFlushPosition,
+                                                                               
toFlush, shouldRolloverJournal, false));
+                                toFlush = entryListRecycler.newInstance();
                             } else {
                                 // Data is already written on the file (though 
it might still be in the OS page-cache)
                                 lastLogMark.setCurLogMark(logId, 
lastFlushPosition);
                                 for (int i = 0; i < toFlush.size(); i++) {
-                                    cbThreadPool.execute((QueueEntry) 
toFlush.get(i));
+                                    cbThreadPool.execute(toFlush.get(i));
                                 }
 
                                 toFlush.clear();
                                 if (shouldRolloverJournal) {
-                                    forceWriteRequests.put(new 
ForceWriteRequest(logFile, logId, lastFlushPosition,
-                                            new LinkedList<>(), 
shouldRolloverJournal, false));
+                                    forceWriteRequests.put(
+                                            createForceWriteRequest(
+                                                    logFile, logId, 
lastFlushPosition,
+                                                    EMPTY_ARRAY_LIST, 
shouldRolloverJournal, false));
                                 }
                             }
 
                             batchSize = 0L;
                             // check whether journal file is over file limit
-                            if (bc.position() > maxJournalSize) {
+                            if (shouldRolloverJournal) {
                                 logFile = null;
                                 continue;
                             }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/RecyclableArrayList.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/RecyclableArrayList.java
new file mode 100644
index 0000000..1febc0c
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/RecyclableArrayList.java
@@ -0,0 +1,54 @@
+// Originally copied from netty project, version 4.1.17-Final, heavily modified
+/*
+ * Copyright 2013 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.util.collections;
+
+import io.netty.util.Recycler.Handle;
+import java.util.ArrayList;
+
+/**
+ * A simple list which is recyclable.
+ */
+public final class RecyclableArrayList<T> extends ArrayList<T> {
+
+    private static final int DEFAULT_INITIAL_CAPACITY = 8;
+
+    public static class Recycler<X>
+        extends io.netty.util.Recycler<RecyclableArrayList<X>> {
+        @Override
+        protected RecyclableArrayList<X> newObject(
+                Handle<RecyclableArrayList<X>> handle) {
+            return new RecyclableArrayList<X>(handle, 
DEFAULT_INITIAL_CAPACITY);
+        }
+
+        public RecyclableArrayList<X> newInstance() {
+            return get();
+        }
+    }
+
+    private final Handle<RecyclableArrayList<T>> handle;
+
+    private RecyclableArrayList(Handle<RecyclableArrayList<T>> handle, int 
initialCapacity) {
+        super(initialCapacity);
+        this.handle = handle;
+    }
+
+    public void recycle() {
+        clear();
+        handle.recycle(this);
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to