This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 34e8bf2  Recycle PendingAddOps
34e8bf2 is described below

commit 34e8bf200c6f3797bd6fa4c5d86646e9eb7f0d3b
Author: Ivan Kelly <[email protected]>
AuthorDate: Tue Oct 31 19:31:21 2017 -0700

    Recycle PendingAddOps
    
    Avoid creating a new PendingAddOp object for each entry added, thus
    saving on garbage.
    
    Originally commit 55ba4723 on the yahoo-4.3 branch.
    
    Author: Ivan Kelly <[email protected]>
    Author: Matteo Merli <[email protected]>
    
    Reviewers: Jia Zhai <None>, Sijie Guo <[email protected]>
    
    This closes #664 from ivankelly/yahoo-bp-1
---
 .../org/apache/bookkeeper/client/LedgerHandle.java |  49 +++-----
 .../apache/bookkeeper/client/LedgerHandleAdv.java  |  38 +++---
 .../org/apache/bookkeeper/client/PendingAddOp.java | 132 +++++++++++++++++----
 3 files changed, 141 insertions(+), 78 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index c61c85b..acb97e0 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -822,8 +822,8 @@ public class LedgerHandle implements WriteHandle {
 
     public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object 
ctx) {
         data.retain();
-        PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx);
-        doAsyncAddEntry(op, data, cb, ctx);
+        PendingAddOp op = PendingAddOp.create(this, data, cb, ctx);
+        doAsyncAddEntry(op);
     }
 
     /**
@@ -864,17 +864,16 @@ public class LedgerHandle implements WriteHandle {
      */
     void asyncRecoveryAddEntry(final byte[] data, final int offset, final int 
length,
                                final AddCallback cb, final Object ctx) {
-        PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, 
ctx).enableRecoveryAdd();
-        doAsyncAddEntry(op, Unpooled.wrappedBuffer(data, offset, length), cb, 
ctx);
+        PendingAddOp op = PendingAddOp.create(this, 
Unpooled.wrappedBuffer(data, offset, length), cb, ctx)
+                .enableRecoveryAdd();
+        doAsyncAddEntry(op);
     }
 
-    protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data, 
final AddCallback cb, final Object ctx) {
+    protected void doAsyncAddEntry(final PendingAddOp op) {
         if (throttler != null) {
             throttler.acquire();
         }
 
-        final long entryId;
-        final long currentLength;
         boolean wasClosed = false;
         synchronized(this) {
             // synchronized on this to ensure that
@@ -882,12 +881,11 @@ public class LedgerHandle implements WriteHandle {
             // updating lastAddPushed
             if (metadata.isClosed()) {
                 wasClosed = true;
-                entryId = -1;
-                currentLength = 0;
             } else {
-                entryId = ++lastAddPushed;
-                currentLength = addToLength(data.readableBytes());
+                long entryId = ++lastAddPushed;
+                long currentLedgerLength = 
addToLength(op.payload.readableBytes());
                 op.setEntryId(entryId);
+                op.setLedgerLength(currentLedgerLength);
                 pendingAddOps.add(op);
             }
         }
@@ -899,8 +897,8 @@ public class LedgerHandle implements WriteHandle {
                     @Override
                     public void safeRun() {
                         LOG.warn("Attempt to add to closed ledger: {}", 
ledgerId);
-                        cb.addComplete(BKException.Code.LedgerClosedException,
-                                LedgerHandle.this, INVALID_ENTRY_ID, ctx);
+                        
op.cb.addComplete(BKException.Code.LedgerClosedException,
+                                LedgerHandle.this, INVALID_ENTRY_ID, op.ctx);
                     }
 
                     @Override
@@ -909,32 +907,17 @@ public class LedgerHandle implements WriteHandle {
                     }
                 });
             } catch (RejectedExecutionException e) {
-                
cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
-                        LedgerHandle.this, INVALID_ENTRY_ID, ctx);
+                
op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
+                        LedgerHandle.this, INVALID_ENTRY_ID, op.ctx);
             }
             return;
         }
 
         try {
-            bk.getMainWorkerPool().submitOrdered(ledgerId, new SafeRunnable() {
-                @Override
-                public void safeRun() {
-                    ByteBuf toSend = 
macManager.computeDigestAndPackageForSending(entryId, lastAddConfirmed,
-                            currentLength, data);
-                    try {
-                        op.initiate(toSend, data.readableBytes());
-                    } finally {
-                        toSend.release();
-                    }
-                }
-                @Override
-                public String toString() {
-                    return String.format("AsyncAddEntry(lid=%d, eid=%d)", 
ledgerId, entryId);
-                }
-            });
+            bk.getMainWorkerPool().submitOrdered(ledgerId, op);
         } catch (RejectedExecutionException e) {
-            
cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
-                    LedgerHandle.this, INVALID_ENTRY_ID, ctx);
+            
op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
+                    LedgerHandle.this, INVALID_ENTRY_ID, op.ctx);
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 39f9932..28324b1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -153,15 +153,15 @@ public class LedgerHandleAdv extends LedgerHandle 
implements WriteAdvHandle {
 
     private void asyncAddEntry(final long entryId, ByteBuf data,
             final AddCallback cb, final Object ctx) {
-        PendingAddOp op = new PendingAddOp(this, cb, ctx);
+        PendingAddOp op = PendingAddOp.create(this, data, cb, ctx);
         op.setEntryId(entryId);
+
         if ((entryId <= this.lastAddConfirmed) || pendingAddOps.contains(op)) {
             LOG.error("Trying to re-add duplicate entryid:{}", entryId);
-            cb.addComplete(BKException.Code.DuplicateEntryIdException,
-                    LedgerHandleAdv.this, entryId, ctx);
+            op.submitCallback(BKException.Code.DuplicateEntryIdException);
             return;
         }
-        doAsyncAddEntry(op, data, cb, ctx);
+        doAsyncAddEntry(op);
     }
 
     /**
@@ -170,7 +170,7 @@ public class LedgerHandleAdv extends LedgerHandle 
implements WriteAdvHandle {
      * unaltered in the base class.
      */
     @Override
-    protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data, 
final AddCallback cb, final Object ctx) {
+    protected void doAsyncAddEntry(final PendingAddOp op) {
         if (throttler != null) {
             throttler.acquire();
         }
@@ -185,9 +185,10 @@ public class LedgerHandleAdv extends LedgerHandle 
implements WriteAdvHandle {
                 wasClosed = true;
                 currentLength = 0;
             } else {
-                currentLength = addToLength(length);
+                currentLength = addToLength(op.payload.readableBytes());
                 pendingAddOps.add(op);
             }
+            op.setLedgerLength(currentLength);
         }
 
         if (wasClosed) {
@@ -197,8 +198,8 @@ public class LedgerHandleAdv extends LedgerHandle 
implements WriteAdvHandle {
                     @Override
                     public void safeRun() {
                         LOG.warn("Attempt to add to closed ledger: {}", 
ledgerId);
-                        cb.addComplete(BKException.Code.LedgerClosedException,
-                                LedgerHandleAdv.this, op.getEntryId(), ctx);
+                        
op.cb.addComplete(BKException.Code.LedgerClosedException,
+                                LedgerHandleAdv.this, op.getEntryId(), op.ctx);
                     }
                     @Override
                     public String toString() {
@@ -206,28 +207,17 @@ public class LedgerHandleAdv extends LedgerHandle 
implements WriteAdvHandle {
                     }
                 });
             } catch (RejectedExecutionException e) {
-                
cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
-                        LedgerHandleAdv.this, op.getEntryId(), ctx);
+                
op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
+                        LedgerHandleAdv.this, op.getEntryId(), op.ctx);
             }
             return;
         }
 
         try {
-            bk.getMainWorkerPool().submit(new SafeRunnable() {
-                @Override
-                public void safeRun() {
-                    ByteBuf toSend = 
macManager.computeDigestAndPackageForSending(op.getEntryId(), lastAddConfirmed,
-                            currentLength, data);
-                    try {
-                        op.initiate(toSend, toSend.readableBytes());
-                    } finally {
-                        toSend.release();
-                    }
-                }
-            });
+            bk.getMainWorkerPool().submitOrdered(ledgerId, op);
         } catch (RejectedExecutionException e) {
-            
cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
-                    LedgerHandleAdv.this, op.getEntryId(), ctx);
+            
op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
+                              LedgerHandleAdv.this, op.getEntryId(), op.ctx);
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 00a36b3..f4d05b7 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -17,7 +17,10 @@
  */
 package org.apache.bookkeeper.client;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import io.netty.buffer.ByteBuf;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
@@ -48,9 +51,10 @@ import java.util.concurrent.RejectedExecutionException;
  *
  *
  */
-class PendingAddOp implements WriteCallback, TimerTask {
+class PendingAddOp extends SafeRunnable implements WriteCallback, TimerTask {
     private final static Logger LOG = 
LoggerFactory.getLogger(PendingAddOp.class);
 
+    ByteBuf payload;
     ByteBuf toSend;
     AddCallback cb;
     Object ctx;
@@ -64,21 +68,38 @@ class PendingAddOp implements WriteCallback, TimerTask {
     boolean isRecoveryAdd = false;
     long requestTimeNanos;
 
-    final int timeoutSec;
+    int timeoutSec;
     Timeout timeout = null;
 
     OpStatsLogger addOpLogger;
-    boolean callbackTriggered = false;
-
-    PendingAddOp(LedgerHandle lh, AddCallback cb, Object ctx) {
-        this.lh = lh;
-        this.cb = cb;
-        this.ctx = ctx;
-        this.entryId = LedgerHandle.INVALID_ENTRY_ID;
-
-        this.ackSet = lh.distributionSchedule.getAckSet();
-        this.addOpLogger = lh.bk.getAddOpLogger();
-        this.timeoutSec = lh.bk.getConf().getAddEntryQuorumTimeout();
+    long currentLedgerLength;
+    int pendingWriteRequests;
+    boolean callbackTriggered;
+    boolean hasRun;
+
+    static PendingAddOp create(LedgerHandle lh, ByteBuf payload, AddCallback 
cb, Object ctx) {
+        PendingAddOp op = RECYCLER.get();
+        op.lh = lh;
+        op.isRecoveryAdd = false;
+        op.cb = cb;
+        op.ctx = ctx;
+        op.entryId = LedgerHandle.INVALID_ENTRY_ID;
+        op.currentLedgerLength = -1;
+        op.payload = payload;
+        op.entryLength = payload.readableBytes();
+
+        op.completed = false;
+        op.ackSet = lh.distributionSchedule.getAckSet();
+        op.addOpLogger = lh.bk.getAddOpLogger();
+        if (op.timeout != null) {
+            op.timeout.cancel();
+        }
+        op.timeout = null;
+        op.timeoutSec = lh.bk.getConf().getAddEntryQuorumTimeout();
+        op.pendingWriteRequests = 0;
+        op.callbackTriggered = false;
+        op.hasRun = false;
+        return op;
     }
 
     /**
@@ -94,6 +115,10 @@ class PendingAddOp implements WriteCallback, TimerTask {
         this.entryId = entryId;
     }
 
+    void setLedgerLength(long ledgerLength) {
+        this.currentLedgerLength = ledgerLength;
+    }
+
     long getEntryId() {
         return this.entryId;
     }
@@ -103,6 +128,7 @@ class PendingAddOp implements WriteCallback, TimerTask {
 
         
lh.bk.getBookieClient().addEntry(lh.metadata.currentEnsemble.get(bookieIndex), 
lh.ledgerId, lh.ledgerKey, entryId, toSend,
                 this, bookieIndex, flags);
+        ++pendingWriteRequests;
     }
 
     @Override
@@ -182,21 +208,31 @@ class PendingAddOp implements WriteCallback, TimerTask {
         sendWriteRequest(bookieIndex);
     }
 
-    void initiate(ByteBuf toSend, int entryLength) {
+    /**
+     * Initiate the add operation
+     */
+    public void safeRun() {
+        hasRun = true;
         if (callbackTriggered) {
-            // this should only be true if the request was failed due to 
another request ahead in the pending queue,
+            // this should only be true if the request was failed due
+            // to another request ahead in the pending queue,
             // so we can just ignore this request
+            maybeRecycle();
             return;
         }
 
         if (timeoutSec > -1) {
-            this.timeout = lh.bk.getBookieClient().scheduleTimeout(this, 
timeoutSec, TimeUnit.SECONDS);
+            this.timeout = lh.bk.getBookieClient().scheduleTimeout(
+                    this, timeoutSec, TimeUnit.SECONDS);
         }
+
         this.requestTimeNanos = MathUtils.nowInNano();
-        this.toSend = toSend;
-        // Retain the buffer until all writes are complete
-        this.toSend.retain();
-        this.entryLength = entryLength;
+        checkNotNull(lh);
+        checkNotNull(lh.macManager);
+
+        this.toSend = lh.macManager.computeDigestAndPackageForSending(
+                entryId, lh.lastAddConfirmed, currentLedgerLength,
+                payload);
 
         // Iterate over set and trigger the sendWriteRequests
         DistributionSchedule.WriteSet writeSet
@@ -213,6 +249,7 @@ class PendingAddOp implements WriteCallback, TimerTask {
     @Override
     public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx) {
         int bookieIndex = (Integer) ctx;
+        --pendingWriteRequests;
 
         if (!lh.metadata.currentEnsemble.get(bookieIndex).equals(addr)) {
             // ensemble has already changed, failure of this addr is immaterial
@@ -246,6 +283,7 @@ class PendingAddOp implements WriteCallback, TimerTask {
             sendAddSuccessCallbacks();
             // I am already finished, ignore incoming responses.
             // otherwise, we might hit the following error handling logic, 
which might cause bad things.
+            maybeRecycle();
             return;
         }
 
@@ -292,7 +330,6 @@ class PendingAddOp implements WriteCallback, TimerTask {
 
         if (ackQuorum && !completed) {
             completed = true;
-            ackSet.recycle();
 
             sendAddSuccessCallbacks();
         }
@@ -324,6 +361,8 @@ class PendingAddOp implements WriteCallback, TimerTask {
         }
         cb.addComplete(rc, lh, entryId, ctx);
         callbackTriggered = true;
+
+        maybeRecycle();
     }
 
     @Override
@@ -348,4 +387,55 @@ class PendingAddOp implements WriteCallback, TimerTask {
        return (this == o);
     }
 
+    private final Handle<PendingAddOp> recyclerHandle;
+    private static final Recycler<PendingAddOp> RECYCLER = new 
Recycler<PendingAddOp>() {
+        protected PendingAddOp newObject(Recycler.Handle<PendingAddOp> handle) 
{
+            return new PendingAddOp(handle);
+        }
+    };
+
+    private PendingAddOp(Handle<PendingAddOp> recyclerHandle) {
+        this.recyclerHandle = recyclerHandle;
+    }
+
+    private void maybeRecycle() {
+        // The reference to PendingAddOp can be held in 3 places
+        // - LedgerHandle#pendingAddOp
+        //   This reference is released when the callback is run
+        // - The executor
+        //   Released after safeRun finishes
+        // - BookieClient
+        //   Holds a reference from the point the addEntry requests are
+        //   sent.
+        // The object can only be recycled after all references are
+        // released, otherwise we could end up recycling twice and all
+        // joy that goes along with that.
+        if (hasRun && callbackTriggered && pendingWriteRequests == 0) {
+            recycle();
+        }
+    }
+
+    private void recycle() {
+        entryId = LedgerHandle.INVALID_ENTRY_ID;
+        currentLedgerLength = -1;
+        payload = null;
+        toSend = null;
+        cb = null;
+        ctx = null;
+        ackSet.recycle();
+        ackSet = null;
+        lh = null;
+        isRecoveryAdd = false;
+        addOpLogger = null;
+        completed = false;
+        pendingWriteRequests = 0;
+        callbackTriggered = false;
+        hasRun = false;
+        if (timeout != null) {
+            timeout.cancel();
+        }
+        timeout = null;
+
+        recyclerHandle.recycle(this);
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to