This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 34e8bf2 Recycle PendingAddOps
34e8bf2 is described below
commit 34e8bf200c6f3797bd6fa4c5d86646e9eb7f0d3b
Author: Ivan Kelly <[email protected]>
AuthorDate: Tue Oct 31 19:31:21 2017 -0700
Recycle PendingAddOps
Avoid creating a new PendingAddOp object for each entry added, thus
saving on garbage.
Originally commit 55ba4723 on the yahoo-4.3 branch.
Author: Ivan Kelly <[email protected]>
Author: Matteo Merli <[email protected]>
Reviewers: Jia Zhai <None>, Sijie Guo <[email protected]>
This closes #664 from ivankelly/yahoo-bp-1
---
.../org/apache/bookkeeper/client/LedgerHandle.java | 49 +++-----
.../apache/bookkeeper/client/LedgerHandleAdv.java | 38 +++---
.../org/apache/bookkeeper/client/PendingAddOp.java | 132 +++++++++++++++++----
3 files changed, 141 insertions(+), 78 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index c61c85b..acb97e0 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -822,8 +822,8 @@ public class LedgerHandle implements WriteHandle {
public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object
ctx) {
data.retain();
- PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx);
- doAsyncAddEntry(op, data, cb, ctx);
+ PendingAddOp op = PendingAddOp.create(this, data, cb, ctx);
+ doAsyncAddEntry(op);
}
/**
@@ -864,17 +864,16 @@ public class LedgerHandle implements WriteHandle {
*/
void asyncRecoveryAddEntry(final byte[] data, final int offset, final int
length,
final AddCallback cb, final Object ctx) {
- PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb,
ctx).enableRecoveryAdd();
- doAsyncAddEntry(op, Unpooled.wrappedBuffer(data, offset, length), cb,
ctx);
+ PendingAddOp op = PendingAddOp.create(this,
Unpooled.wrappedBuffer(data, offset, length), cb, ctx)
+ .enableRecoveryAdd();
+ doAsyncAddEntry(op);
}
- protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data,
final AddCallback cb, final Object ctx) {
+ protected void doAsyncAddEntry(final PendingAddOp op) {
if (throttler != null) {
throttler.acquire();
}
- final long entryId;
- final long currentLength;
boolean wasClosed = false;
synchronized(this) {
// synchronized on this to ensure that
@@ -882,12 +881,11 @@ public class LedgerHandle implements WriteHandle {
// updating lastAddPushed
if (metadata.isClosed()) {
wasClosed = true;
- entryId = -1;
- currentLength = 0;
} else {
- entryId = ++lastAddPushed;
- currentLength = addToLength(data.readableBytes());
+ long entryId = ++lastAddPushed;
+ long currentLedgerLength =
addToLength(op.payload.readableBytes());
op.setEntryId(entryId);
+ op.setLedgerLength(currentLedgerLength);
pendingAddOps.add(op);
}
}
@@ -899,8 +897,8 @@ public class LedgerHandle implements WriteHandle {
@Override
public void safeRun() {
LOG.warn("Attempt to add to closed ledger: {}",
ledgerId);
- cb.addComplete(BKException.Code.LedgerClosedException,
- LedgerHandle.this, INVALID_ENTRY_ID, ctx);
+
op.cb.addComplete(BKException.Code.LedgerClosedException,
+ LedgerHandle.this, INVALID_ENTRY_ID, op.ctx);
}
@Override
@@ -909,32 +907,17 @@ public class LedgerHandle implements WriteHandle {
}
});
} catch (RejectedExecutionException e) {
-
cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
- LedgerHandle.this, INVALID_ENTRY_ID, ctx);
+
op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
+ LedgerHandle.this, INVALID_ENTRY_ID, op.ctx);
}
return;
}
try {
- bk.getMainWorkerPool().submitOrdered(ledgerId, new SafeRunnable() {
- @Override
- public void safeRun() {
- ByteBuf toSend =
macManager.computeDigestAndPackageForSending(entryId, lastAddConfirmed,
- currentLength, data);
- try {
- op.initiate(toSend, data.readableBytes());
- } finally {
- toSend.release();
- }
- }
- @Override
- public String toString() {
- return String.format("AsyncAddEntry(lid=%d, eid=%d)",
ledgerId, entryId);
- }
- });
+ bk.getMainWorkerPool().submitOrdered(ledgerId, op);
} catch (RejectedExecutionException e) {
-
cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
- LedgerHandle.this, INVALID_ENTRY_ID, ctx);
+
op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
+ LedgerHandle.this, INVALID_ENTRY_ID, op.ctx);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 39f9932..28324b1 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -153,15 +153,15 @@ public class LedgerHandleAdv extends LedgerHandle
implements WriteAdvHandle {
private void asyncAddEntry(final long entryId, ByteBuf data,
final AddCallback cb, final Object ctx) {
- PendingAddOp op = new PendingAddOp(this, cb, ctx);
+ PendingAddOp op = PendingAddOp.create(this, data, cb, ctx);
op.setEntryId(entryId);
+
if ((entryId <= this.lastAddConfirmed) || pendingAddOps.contains(op)) {
LOG.error("Trying to re-add duplicate entryid:{}", entryId);
- cb.addComplete(BKException.Code.DuplicateEntryIdException,
- LedgerHandleAdv.this, entryId, ctx);
+ op.submitCallback(BKException.Code.DuplicateEntryIdException);
return;
}
- doAsyncAddEntry(op, data, cb, ctx);
+ doAsyncAddEntry(op);
}
/**
@@ -170,7 +170,7 @@ public class LedgerHandleAdv extends LedgerHandle
implements WriteAdvHandle {
* unaltered in the base class.
*/
@Override
- protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data,
final AddCallback cb, final Object ctx) {
+ protected void doAsyncAddEntry(final PendingAddOp op) {
if (throttler != null) {
throttler.acquire();
}
@@ -185,9 +185,10 @@ public class LedgerHandleAdv extends LedgerHandle
implements WriteAdvHandle {
wasClosed = true;
currentLength = 0;
} else {
- currentLength = addToLength(length);
+ currentLength = addToLength(op.payload.readableBytes());
pendingAddOps.add(op);
}
+ op.setLedgerLength(currentLength);
}
if (wasClosed) {
@@ -197,8 +198,8 @@ public class LedgerHandleAdv extends LedgerHandle
implements WriteAdvHandle {
@Override
public void safeRun() {
LOG.warn("Attempt to add to closed ledger: {}",
ledgerId);
- cb.addComplete(BKException.Code.LedgerClosedException,
- LedgerHandleAdv.this, op.getEntryId(), ctx);
+
op.cb.addComplete(BKException.Code.LedgerClosedException,
+ LedgerHandleAdv.this, op.getEntryId(), op.ctx);
}
@Override
public String toString() {
@@ -206,28 +207,17 @@ public class LedgerHandleAdv extends LedgerHandle
implements WriteAdvHandle {
}
});
} catch (RejectedExecutionException e) {
-
cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
- LedgerHandleAdv.this, op.getEntryId(), ctx);
+
op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
+ LedgerHandleAdv.this, op.getEntryId(), op.ctx);
}
return;
}
try {
- bk.getMainWorkerPool().submit(new SafeRunnable() {
- @Override
- public void safeRun() {
- ByteBuf toSend =
macManager.computeDigestAndPackageForSending(op.getEntryId(), lastAddConfirmed,
- currentLength, data);
- try {
- op.initiate(toSend, toSend.readableBytes());
- } finally {
- toSend.release();
- }
- }
- });
+ bk.getMainWorkerPool().submitOrdered(ledgerId, op);
} catch (RejectedExecutionException e) {
-
cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
- LedgerHandleAdv.this, op.getEntryId(), ctx);
+
op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
+ LedgerHandleAdv.this, op.getEntryId(), op.ctx);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 00a36b3..f4d05b7 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -17,7 +17,10 @@
*/
package org.apache.bookkeeper.client;
+import static com.google.common.base.Preconditions.checkNotNull;
import io.netty.buffer.ByteBuf;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
@@ -48,9 +51,10 @@ import java.util.concurrent.RejectedExecutionException;
*
*
*/
-class PendingAddOp implements WriteCallback, TimerTask {
+class PendingAddOp extends SafeRunnable implements WriteCallback, TimerTask {
private final static Logger LOG =
LoggerFactory.getLogger(PendingAddOp.class);
+ ByteBuf payload;
ByteBuf toSend;
AddCallback cb;
Object ctx;
@@ -64,21 +68,38 @@ class PendingAddOp implements WriteCallback, TimerTask {
boolean isRecoveryAdd = false;
long requestTimeNanos;
- final int timeoutSec;
+ int timeoutSec;
Timeout timeout = null;
OpStatsLogger addOpLogger;
- boolean callbackTriggered = false;
-
- PendingAddOp(LedgerHandle lh, AddCallback cb, Object ctx) {
- this.lh = lh;
- this.cb = cb;
- this.ctx = ctx;
- this.entryId = LedgerHandle.INVALID_ENTRY_ID;
-
- this.ackSet = lh.distributionSchedule.getAckSet();
- this.addOpLogger = lh.bk.getAddOpLogger();
- this.timeoutSec = lh.bk.getConf().getAddEntryQuorumTimeout();
+ long currentLedgerLength;
+ int pendingWriteRequests;
+ boolean callbackTriggered;
+ boolean hasRun;
+
+ static PendingAddOp create(LedgerHandle lh, ByteBuf payload, AddCallback
cb, Object ctx) {
+ PendingAddOp op = RECYCLER.get();
+ op.lh = lh;
+ op.isRecoveryAdd = false;
+ op.cb = cb;
+ op.ctx = ctx;
+ op.entryId = LedgerHandle.INVALID_ENTRY_ID;
+ op.currentLedgerLength = -1;
+ op.payload = payload;
+ op.entryLength = payload.readableBytes();
+
+ op.completed = false;
+ op.ackSet = lh.distributionSchedule.getAckSet();
+ op.addOpLogger = lh.bk.getAddOpLogger();
+ if (op.timeout != null) {
+ op.timeout.cancel();
+ }
+ op.timeout = null;
+ op.timeoutSec = lh.bk.getConf().getAddEntryQuorumTimeout();
+ op.pendingWriteRequests = 0;
+ op.callbackTriggered = false;
+ op.hasRun = false;
+ return op;
}
/**
@@ -94,6 +115,10 @@ class PendingAddOp implements WriteCallback, TimerTask {
this.entryId = entryId;
}
+ void setLedgerLength(long ledgerLength) {
+ this.currentLedgerLength = ledgerLength;
+ }
+
long getEntryId() {
return this.entryId;
}
@@ -103,6 +128,7 @@ class PendingAddOp implements WriteCallback, TimerTask {
lh.bk.getBookieClient().addEntry(lh.metadata.currentEnsemble.get(bookieIndex),
lh.ledgerId, lh.ledgerKey, entryId, toSend,
this, bookieIndex, flags);
+ ++pendingWriteRequests;
}
@Override
@@ -182,21 +208,31 @@ class PendingAddOp implements WriteCallback, TimerTask {
sendWriteRequest(bookieIndex);
}
- void initiate(ByteBuf toSend, int entryLength) {
+ /**
+ * Initiate the add operation
+ */
+ public void safeRun() {
+ hasRun = true;
if (callbackTriggered) {
- // this should only be true if the request was failed due to
another request ahead in the pending queue,
+ // this should only be true if the request was failed due
+ // to another request ahead in the pending queue,
// so we can just ignore this request
+ maybeRecycle();
return;
}
if (timeoutSec > -1) {
- this.timeout = lh.bk.getBookieClient().scheduleTimeout(this,
timeoutSec, TimeUnit.SECONDS);
+ this.timeout = lh.bk.getBookieClient().scheduleTimeout(
+ this, timeoutSec, TimeUnit.SECONDS);
}
+
this.requestTimeNanos = MathUtils.nowInNano();
- this.toSend = toSend;
- // Retain the buffer until all writes are complete
- this.toSend.retain();
- this.entryLength = entryLength;
+ checkNotNull(lh);
+ checkNotNull(lh.macManager);
+
+ this.toSend = lh.macManager.computeDigestAndPackageForSending(
+ entryId, lh.lastAddConfirmed, currentLedgerLength,
+ payload);
// Iterate over set and trigger the sendWriteRequests
DistributionSchedule.WriteSet writeSet
@@ -213,6 +249,7 @@ class PendingAddOp implements WriteCallback, TimerTask {
@Override
public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx) {
int bookieIndex = (Integer) ctx;
+ --pendingWriteRequests;
if (!lh.metadata.currentEnsemble.get(bookieIndex).equals(addr)) {
// ensemble has already changed, failure of this addr is immaterial
@@ -246,6 +283,7 @@ class PendingAddOp implements WriteCallback, TimerTask {
sendAddSuccessCallbacks();
// I am already finished, ignore incoming responses.
// otherwise, we might hit the following error handling logic,
which might cause bad things.
+ maybeRecycle();
return;
}
@@ -292,7 +330,6 @@ class PendingAddOp implements WriteCallback, TimerTask {
if (ackQuorum && !completed) {
completed = true;
- ackSet.recycle();
sendAddSuccessCallbacks();
}
@@ -324,6 +361,8 @@ class PendingAddOp implements WriteCallback, TimerTask {
}
cb.addComplete(rc, lh, entryId, ctx);
callbackTriggered = true;
+
+ maybeRecycle();
}
@Override
@@ -348,4 +387,55 @@ class PendingAddOp implements WriteCallback, TimerTask {
return (this == o);
}
+ private final Handle<PendingAddOp> recyclerHandle;
+ private static final Recycler<PendingAddOp> RECYCLER = new
Recycler<PendingAddOp>() {
+ protected PendingAddOp newObject(Recycler.Handle<PendingAddOp> handle)
{
+ return new PendingAddOp(handle);
+ }
+ };
+
+ private PendingAddOp(Handle<PendingAddOp> recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private void maybeRecycle() {
+ // The reference to PendingAddOp can be held in 3 places
+ // - LedgerHandle#pendingAddOp
+ // This reference is released when the callback is run
+ // - The executor
+ // Released after safeRun finishes
+ // - BookieClient
+ // Holds a reference from the point the addEntry requests are
+ // sent.
+ // The object can only be recycled after all references are
+ // released, otherwise we could end up recycling twice and all
+ // joy that goes along with that.
+ if (hasRun && callbackTriggered && pendingWriteRequests == 0) {
+ recycle();
+ }
+ }
+
+ private void recycle() {
+ entryId = LedgerHandle.INVALID_ENTRY_ID;
+ currentLedgerLength = -1;
+ payload = null;
+ toSend = null;
+ cb = null;
+ ctx = null;
+ ackSet.recycle();
+ ackSet = null;
+ lh = null;
+ isRecoveryAdd = false;
+ addOpLogger = null;
+ completed = false;
+ pendingWriteRequests = 0;
+ callbackTriggered = false;
+ hasRun = false;
+ if (timeout != null) {
+ timeout.cancel();
+ }
+ timeout = null;
+
+ recyclerHandle.recycle(this);
+ }
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].