This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6af747f  [ML] Avoid passing OpAddEntry across a thread boundary in 
asyncAddEntry (#12606)
6af747f is described below

commit 6af747f515677796bba343997b2269ffd27cb601
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Nov 5 22:12:47 2021 +0200

    [ML] Avoid passing OpAddEntry across a thread boundary in asyncAddEntry 
(#12606)
    
    * [ML] Avoid passing OpAddEntry across a thread boundary
    
    * Retain buffer in current thread
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java   | 20 +++++++++++++-------
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java   | 18 +++++++++---------
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java   |  2 +-
 3 files changed, 23 insertions(+), 17 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4b09db0..d4d0aa3 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -699,10 +699,14 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             log.debug("[{}] asyncAddEntry size={} state={}", name, 
buffer.readableBytes(), state);
         }
 
-        OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, 
ctx);
+        // retain buffer in this thread
+        buffer.retain();
 
         // Jump to specific thread to avoid contention from writers writing 
from different threads
-        executor.executeOrdered(name, safeRun(() -> 
internalAsyncAddEntry(addOperation)));
+        executor.executeOrdered(name, safeRun(() -> {
+            OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, 
buffer, callback, ctx);
+            internalAsyncAddEntry(addOperation);
+        }));
     }
 
     @Override
@@ -711,10 +715,14 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             log.debug("[{}] asyncAddEntry size={} state={}", name, 
buffer.readableBytes(), state);
         }
 
-        OpAddEntry addOperation = OpAddEntry.create(this, buffer, 
numberOfMessages, callback, ctx);
+        // retain buffer in this thread
+        buffer.retain();
 
         // Jump to specific thread to avoid contention from writers writing 
from different threads
-        executor.executeOrdered(name, safeRun(() -> 
internalAsyncAddEntry(addOperation)));
+        executor.executeOrdered(name, safeRun(() -> {
+            OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, 
buffer, numberOfMessages, callback, ctx);
+            internalAsyncAddEntry(addOperation);
+        }));
     }
 
     private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
@@ -1501,9 +1509,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 // If op is used by another ledger handle, we need to close it 
and create a new one
                 if (existsOp.ledger != null) {
                     existsOp.close();
-                    existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, 
existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
-                    // release the extra retain
-                    ReferenceCountUtil.release(existsOp.data);
+                    existsOp = OpAddEntry.createNoRetainBuffer(existsOp.ml, 
existsOp.data, existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
                 }
                 existsOp.setLedger(currentLedger);
                 pendingAddEntries.add(existsOp);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 9106b4f..e08b204 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -75,16 +75,16 @@ public class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallba
         CLOSED
     }
 
-    public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, 
AddEntryCallback callback, Object ctx) {
-        OpAddEntry op = createOpAddEntry(ml, data, callback, ctx);
+    public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, 
ByteBuf data, AddEntryCallback callback, Object ctx) {
+        OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, 
ctx);
         if (log.isDebugEnabled()) {
             log.debug("Created new OpAddEntry {}", op);
         }
         return op;
     }
 
-    public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, int 
numberOfMessages, AddEntryCallback callback, Object ctx) {
-        OpAddEntry op = createOpAddEntry(ml, data, callback, ctx);
+    public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, 
ByteBuf data, int numberOfMessages, AddEntryCallback callback, Object ctx) {
+        OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, 
ctx);
         op.numberOfMessages = numberOfMessages;
         if (log.isDebugEnabled()) {
             log.debug("Created new OpAddEntry {}", op);
@@ -92,11 +92,11 @@ public class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallba
         return op;
     }
 
-    private static OpAddEntry createOpAddEntry(ManagedLedgerImpl ml, ByteBuf 
data, AddEntryCallback callback, Object ctx) {
+    private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl 
ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
         OpAddEntry op = RECYCLER.get();
         op.ml = ml;
         op.ledger = null;
-        op.data = data.retain();
+        op.data = data;
         op.dataLength = data.readableBytes();
         op.callback = callback;
         op.ctx = ctx;
@@ -155,7 +155,7 @@ public class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallba
         }
         checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match 
with acked ledgerId %s", ledger.getId(),
                 lh.getId());
-        
+
         if (!checkAndCompleteOp(ctx)) {
             // means callback might have been completed by different thread 
(timeout task thread).. so do nothing
             return;
@@ -255,7 +255,7 @@ public class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallba
 
     /**
      * Checks if add-operation is completed
-     * 
+     *
      * @return true if task is not already completed else returns false.
      */
     private boolean checkAndCompleteOp(Object ctx) {
@@ -276,7 +276,7 @@ public class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallba
 
     /**
      * It handles add failure on the given ledger. it can be triggered when 
add-entry fails or times out.
-     * 
+     *
      * @param lh
      */
     void handleAddFailure(final LedgerHandle lh) {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index d10fcdd..d837651 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2830,7 +2830,7 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
 
         List<OpAddEntry> oldOps = new ArrayList<>();
         for (int i = 0; i < 10; i++) {
-            OpAddEntry op = OpAddEntry.create(ledger, 
ByteBufAllocator.DEFAULT.buffer(128), null, null);
+            OpAddEntry op = OpAddEntry.createNoRetainBuffer(ledger, 
ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null);
             if (i > 4) {
                 op.setLedger(mock(LedgerHandle.class));
             }

Reply via email to