This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6af747f [ML] Avoid passing OpAddEntry across a thread boundary in
asyncAddEntry (#12606)
6af747f is described below
commit 6af747f515677796bba343997b2269ffd27cb601
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Nov 5 22:12:47 2021 +0200
[ML] Avoid passing OpAddEntry across a thread boundary in asyncAddEntry
(#12606)
* [ML] Avoid passing OpAddEntry across a thread boundary
* Retain buffer in current thread
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 20 +++++++++++++-------
.../apache/bookkeeper/mledger/impl/OpAddEntry.java | 18 +++++++++---------
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +-
3 files changed, 23 insertions(+), 17 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4b09db0..d4d0aa3 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -699,10 +699,14 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
log.debug("[{}] asyncAddEntry size={} state={}", name,
buffer.readableBytes(), state);
}
- OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback,
ctx);
+ // retain buffer in this thread
+ buffer.retain();
// Jump to specific thread to avoid contention from writers writing
from different threads
- executor.executeOrdered(name, safeRun(() ->
internalAsyncAddEntry(addOperation)));
+ executor.executeOrdered(name, safeRun(() -> {
+ OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this,
buffer, callback, ctx);
+ internalAsyncAddEntry(addOperation);
+ }));
}
@Override
@@ -711,10 +715,14 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
log.debug("[{}] asyncAddEntry size={} state={}", name,
buffer.readableBytes(), state);
}
- OpAddEntry addOperation = OpAddEntry.create(this, buffer,
numberOfMessages, callback, ctx);
+ // retain buffer in this thread
+ buffer.retain();
// Jump to specific thread to avoid contention from writers writing
from different threads
- executor.executeOrdered(name, safeRun(() ->
internalAsyncAddEntry(addOperation)));
+ executor.executeOrdered(name, safeRun(() -> {
+ OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this,
buffer, numberOfMessages, callback, ctx);
+ internalAsyncAddEntry(addOperation);
+ }));
}
private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
@@ -1501,9 +1509,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
// If op is used by another ledger handle, we need to close it
and create a new one
if (existsOp.ledger != null) {
existsOp.close();
- existsOp = OpAddEntry.create(existsOp.ml, existsOp.data,
existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
- // release the extra retain
- ReferenceCountUtil.release(existsOp.data);
+ existsOp = OpAddEntry.createNoRetainBuffer(existsOp.ml,
existsOp.data, existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
}
existsOp.setLedger(currentLedger);
pendingAddEntries.add(existsOp);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 9106b4f..e08b204 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -75,16 +75,16 @@ public class OpAddEntry extends SafeRunnable implements
AddCallback, CloseCallba
CLOSED
}
- public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data,
AddEntryCallback callback, Object ctx) {
- OpAddEntry op = createOpAddEntry(ml, data, callback, ctx);
+ public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml,
ByteBuf data, AddEntryCallback callback, Object ctx) {
+ OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback,
ctx);
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
}
return op;
}
- public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, int
numberOfMessages, AddEntryCallback callback, Object ctx) {
- OpAddEntry op = createOpAddEntry(ml, data, callback, ctx);
+ public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml,
ByteBuf data, int numberOfMessages, AddEntryCallback callback, Object ctx) {
+ OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback,
ctx);
op.numberOfMessages = numberOfMessages;
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
@@ -92,11 +92,11 @@ public class OpAddEntry extends SafeRunnable implements
AddCallback, CloseCallba
return op;
}
- private static OpAddEntry createOpAddEntry(ManagedLedgerImpl ml, ByteBuf
data, AddEntryCallback callback, Object ctx) {
+ private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl
ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
OpAddEntry op = RECYCLER.get();
op.ml = ml;
op.ledger = null;
- op.data = data.retain();
+ op.data = data;
op.dataLength = data.readableBytes();
op.callback = callback;
op.ctx = ctx;
@@ -155,7 +155,7 @@ public class OpAddEntry extends SafeRunnable implements
AddCallback, CloseCallba
}
checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match
with acked ledgerId %s", ledger.getId(),
lh.getId());
-
+
if (!checkAndCompleteOp(ctx)) {
// means callback might have been completed by different thread
(timeout task thread).. so do nothing
return;
@@ -255,7 +255,7 @@ public class OpAddEntry extends SafeRunnable implements
AddCallback, CloseCallba
/**
* Checks if add-operation is completed
- *
+ *
* @return true if task is not already completed else returns false.
*/
private boolean checkAndCompleteOp(Object ctx) {
@@ -276,7 +276,7 @@ public class OpAddEntry extends SafeRunnable implements
AddCallback, CloseCallba
/**
* It handles add failure on the given ledger. it can be triggered when
add-entry fails or times out.
- *
+ *
* @param lh
*/
void handleAddFailure(final LedgerHandle lh) {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index d10fcdd..d837651 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2830,7 +2830,7 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
List<OpAddEntry> oldOps = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- OpAddEntry op = OpAddEntry.create(ledger,
ByteBufAllocator.DEFAULT.buffer(128), null, null);
+ OpAddEntry op = OpAddEntry.createNoRetainBuffer(ledger,
ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null);
if (i > 4) {
op.setLedger(mock(LedgerHandle.class));
}