eolivelli commented on a change in pull request #10755:
URL: https://github.com/apache/pulsar/pull/10755#discussion_r642649741



##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
##########
@@ -201,42 +214,47 @@ public void safeRun() {
             // `data` will be released in `closeComplete`
             ledger.asyncClose(this, ctx);
         } else {
-            updateLatency();
-            AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
-            if (cb != null) {
-                cb.addComplete(lastEntry, data.asReadOnly(), ctx);
-                ml.notifyCursors();
-                ml.notifyWaitingEntryCallBacks();
-                ReferenceCountUtil.release(data);
-                this.recycle();
-            } else {
+            ByteBuf data = this.data;
+            try {
+                updateLatency();
+                AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
+                if (cb != null) {
+                    cb.addComplete(lastEntry, data.asReadOnly(), ctx);
+                    ml.notifyCursors();
+                    ml.notifyWaitingEntryCallBacks();
+                    this.recycle();

Review comment:
       it looks like now we are calling `recycle` before 
`ReferenceCountUtil.release(data);`
   may it be a problem ?

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
##########
@@ -201,42 +214,47 @@ public void safeRun() {
             // `data` will be released in `closeComplete`
             ledger.asyncClose(this, ctx);
         } else {
-            updateLatency();
-            AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
-            if (cb != null) {
-                cb.addComplete(lastEntry, data.asReadOnly(), ctx);
-                ml.notifyCursors();
-                ml.notifyWaitingEntryCallBacks();
-                ReferenceCountUtil.release(data);
-                this.recycle();
-            } else {
+            ByteBuf data = this.data;
+            try {
+                updateLatency();
+                AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
+                if (cb != null) {
+                    cb.addComplete(lastEntry, data.asReadOnly(), ctx);
+                    ml.notifyCursors();
+                    ml.notifyWaitingEntryCallBacks();
+                    this.recycle();
+                }
+            } finally {
                 ReferenceCountUtil.release(data);
             }
         }
     }
 
     @Override
     public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
-        checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match 
with acked ledgerId %s", ledger.getId(),
-                lh.getId());
-
-        if (rc == BKException.Code.OK) {
-            log.debug("Successfully closed ledger {}", lh.getId());
-        } else {
-            log.warn("Error when closing ledger {}. Status={}", lh.getId(), 
BKException.getMessage(rc));
-        }
+        ByteBuf data = this.data;
+        try {
+            checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't 
match with acked ledgerId %s",
+                    ledger.getId(),
+                    lh.getId());
+
+            if (rc == BKException.Code.OK) {
+                log.debug("Successfully closed ledger {}", lh.getId());
+            } else {
+                log.warn("Error when closing ledger {}. Status={}", 
lh.getId(), BKException.getMessage(rc));
+            }
 
-        ml.ledgerClosed(lh);
-        updateLatency();
+            ml.ledgerClosed(lh);
+            updateLatency();
 
-        AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
-        if (cb != null) {
-            cb.addComplete(PositionImpl.get(lh.getId(), entryId), 
data.asReadOnly(), ctx);
-            ml.notifyCursors();
-            ml.notifyWaitingEntryCallBacks();
-            ReferenceCountUtil.release(data);
-            this.recycle();
-        } else {
+            AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
+            if (cb != null) {
+                cb.addComplete(PositionImpl.get(lh.getId(), entryId), 
data.asReadOnly(), ctx);
+                ml.notifyCursors();
+                ml.notifyWaitingEntryCallBacks();
+                this.recycle();

Review comment:
       the same here

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1536,6 +1538,11 @@ public synchronized void updateLedgersIdsComplete(Stat 
stat) {
         }
     }
 
+    synchronized OpAddEntry pollPendingAddEntry() {
+        return pendingAddEntries.poll();

Review comment:
       should we make pendingAddEntries  'private' ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to