merlimat commented on a change in pull request #10755:
URL: https://github.com/apache/pulsar/pull/10755#discussion_r643271542



##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
##########
@@ -178,8 +178,21 @@ public void addComplete(int rc, final LedgerHandle lh, 
long entryId, Object ctx)
     @Override
     public void safeRun() {
         // Remove this entry from the head of the pending queue
-        OpAddEntry firstInQueue = ml.pendingAddEntries.poll();
-        checkArgument(this == firstInQueue);

Review comment:
       I'd leave this change out of this PR. This is a "logic" check. If this 
fails it means we really have a code bug issue and we cannot make many 
assumptions on how to recover from it.

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
##########
@@ -201,42 +214,47 @@ public void safeRun() {
             // `data` will be released in `closeComplete`
             ledger.asyncClose(this, ctx);
         } else {
-            updateLatency();
-            AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
-            if (cb != null) {
-                cb.addComplete(lastEntry, data.asReadOnly(), ctx);
-                ml.notifyCursors();
-                ml.notifyWaitingEntryCallBacks();
-                ReferenceCountUtil.release(data);
-                this.recycle();
-            } else {
+            ByteBuf data = this.data;
+            try {

Review comment:
       The change here LGTM, although I'd prefer to get it in a separate PR. 
Also, do you have a stack trace on why the exception would have been triggered 
here?

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1504,6 +1504,8 @@ public synchronized void updateLedgersIdsComplete(Stat 
stat) {
                 if (existsOp.ledger != null) {
                     existsOp.close();
                     existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, 
existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
+                    // release the extra retain
+                    ReferenceCountUtil.release(existsOp.data);

Review comment:
       👍  I think this was introduced in #5942. 

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
##########
@@ -201,42 +214,47 @@ public void safeRun() {
             // `data` will be released in `closeComplete`
             ledger.asyncClose(this, ctx);
         } else {
-            updateLatency();
-            AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
-            if (cb != null) {
-                cb.addComplete(lastEntry, data.asReadOnly(), ctx);
-                ml.notifyCursors();
-                ml.notifyWaitingEntryCallBacks();
-                ReferenceCountUtil.release(data);
-                this.recycle();
-            } else {
+            ByteBuf data = this.data;
+            try {
+                updateLatency();
+                AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
+                if (cb != null) {
+                    cb.addComplete(lastEntry, data.asReadOnly(), ctx);
+                    ml.notifyCursors();
+                    ml.notifyWaitingEntryCallBacks();
+                    this.recycle();
+                }
+            } finally {
                 ReferenceCountUtil.release(data);
             }
         }
     }
 
     @Override
     public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
-        checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match 
with acked ledgerId %s", ledger.getId(),
-                lh.getId());
-
-        if (rc == BKException.Code.OK) {
-            log.debug("Successfully closed ledger {}", lh.getId());
-        } else {
-            log.warn("Error when closing ledger {}. Status={}", lh.getId(), 
BKException.getMessage(rc));
-        }
+        ByteBuf data = this.data;
+        try {
+            checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't 
match with acked ledgerId %s",

Review comment:
       Same comment for the other assertion above. This is really a code-sanity 
check. If this fails, it means things are **very** broken. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to