Copilot commented on code in PR #25240:
URL: https://github.com/apache/pulsar/pull/25240#discussion_r2793210568


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1751,20 +1752,24 @@ public void operationComplete(Void v, Stat stat) {
                         log.debug("[{}] Updating of ledgers list after create 
complete. version={}", name, stat);
                     }
                     ledgersStat = stat;
-                    synchronized (ManagedLedgerImpl.this) {
-                        LedgerHandle originalCurrentLedger = currentLedger;
-                        ledgers.put(lh.getId(), newLedger);
-                        currentLedger = lh;
-                        currentLedgerTimeoutTriggered = new AtomicBoolean();
-                        currentLedgerEntries = 0;
-                        currentLedgerSize = 0;
-                        updateLedgersIdsComplete(originalCurrentLedger);
-                        
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
-                                - lastLedgerCreationInitiationTimestamp, 
TimeUnit.MILLISECONDS);
-                        // May need to update the cursor position
-                        maybeUpdateCursorBeforeTrimmingConsumedLedger();
-                    }
-                    metadataMutex.unlock();
+                    // make sure that pendingAddEntries' opeartions are 
executed in the same thread
+                    // to avoid potential concurrent issues
+                    executor.execute(() -> {
+                        synchronized (ManagedLedgerImpl.this) {
+                            LedgerHandle originalCurrentLedger = currentLedger;
+                            ledgers.put(lh.getId(), newLedger);
+                            currentLedger = lh;
+                            currentLedgerTimeoutTriggered = new 
AtomicBoolean();
+                            currentLedgerEntries = 0;
+                            currentLedgerSize = 0;
+                            updateLedgersIdsComplete(originalCurrentLedger);
+                            
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
+                                    - lastLedgerCreationInitiationTimestamp, 
TimeUnit.MILLISECONDS);
+                            // May need to update the cursor position
+                            maybeUpdateCursorBeforeTrimmingConsumedLedger();

Review Comment:
   The new executor.execute() block updates ledgers/currentLedger and calls 
updateLedgersIdsComplete() without re-checking the ManagedLedger state. Since 
this work is now deferred, it can run after asyncClose()/setFenced and 
potentially transition the state back to LedgerOpened and/or touch ledgers 
after closure. Consider checking STATE_UPDATER.get(this) for Closed or isFenced 
inside the executor task and bailing out (while still releasing metadataMutex) 
if the ledger is no longer writable.
   ```suggestion
                               // Re-check state after deferring to executor to 
avoid mutating a closed/fenced ledger
                               if (STATE_UPDATER.get(ManagedLedgerImpl.this) == 
State.Closed || isFenced()) {
                                   if (log.isDebugEnabled()) {
                                       log.debug("[{}] Skipping ledger update 
after create complete because ledger is "
                                               + "closed or fenced", name);
                                   }
                               } else {
                                   LedgerHandle originalCurrentLedger = 
currentLedger;
                                   ledgers.put(lh.getId(), newLedger);
                                   currentLedger = lh;
                                   currentLedgerTimeoutTriggered = new 
AtomicBoolean();
                                   currentLedgerEntries = 0;
                                   currentLedgerSize = 0;
                                   
updateLedgersIdsComplete(originalCurrentLedger);
                                   
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
                                           - 
lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS);
                                   // May need to update the cursor position
                                   
maybeUpdateCursorBeforeTrimmingConsumedLedger();
                               }
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2070,6 +2078,17 @@ void clearPendingAddEntries(ManagedLedgerException e) {
         }
     }
 
+    void clearNotInitiatedPendingAddEntries(ManagedLedgerException e) {
+        Iterator<OpAddEntry> iterator = pendingAddEntries.iterator();
+        while (iterator.hasNext()) {
+            OpAddEntry op = iterator.next();
+            if (op.closeIfNotInitiated()) {
+                op.failed(e);
+                iterator.remove();
+            }
+        }

Review Comment:
   There are existing tests in this module for pendingAddEntries behavior, but 
the new leak-prevention logic (clearNotInitiatedPendingAddEntries + 
OpAddEntry.closeIfNotInitiated) is not covered. Please add tests for the two 
scenarios described in the PR (close/fence occurring while ops are queued but 
not yet initiated, and fenced write-failure path clearing subsequent queued 
ops) to prevent regressions.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1937,6 +1942,9 @@ synchronized void ledgerClosed(final LedgerHandle lh, 
Long lastAddConfirmed) {
             // The managed ledger was closed during the write operation
             clearPendingAddEntries(new 
ManagedLedgerAlreadyClosedException("Managed ledger was already closed"));
             return;
+        } else if (state == State.Fenced) {
+            clearPendingAddEntries(new ManagedLedgerFencedException("Managed 
ledger is fenced"));

Review Comment:
   ledgerClosed() only checks `state == State.Fenced`, but 
`State.FencedForDeletion` also returns true for state.isFenced(). In that case 
pending add entries would still not be cleared, which can reintroduce the leak 
for fenced-for-deletion. Consider using `state.isFenced()` here (and keep the 
exception message aligned with the specific fenced state if needed).
   ```suggestion
           } else if (state.isFenced()) {
               clearPendingAddEntries(new ManagedLedgerFencedException(
                       "Managed ledger is fenced (" + state + ")"));
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1751,20 +1752,24 @@ public void operationComplete(Void v, Stat stat) {
                         log.debug("[{}] Updating of ledgers list after create 
complete. version={}", name, stat);
                     }
                     ledgersStat = stat;
-                    synchronized (ManagedLedgerImpl.this) {
-                        LedgerHandle originalCurrentLedger = currentLedger;
-                        ledgers.put(lh.getId(), newLedger);
-                        currentLedger = lh;
-                        currentLedgerTimeoutTriggered = new AtomicBoolean();
-                        currentLedgerEntries = 0;
-                        currentLedgerSize = 0;
-                        updateLedgersIdsComplete(originalCurrentLedger);
-                        
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
-                                - lastLedgerCreationInitiationTimestamp, 
TimeUnit.MILLISECONDS);
-                        // May need to update the cursor position
-                        maybeUpdateCursorBeforeTrimmingConsumedLedger();
-                    }
-                    metadataMutex.unlock();
+                    // make sure that pendingAddEntries' opeartions are 
executed in the same thread

Review Comment:
   The comment has a typo: "opeartions" → "operations".
   ```suggestion
                       // make sure that pendingAddEntries' operations are 
executed in the same thread
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1652,6 +1652,7 @@ public synchronized void asyncClose(final CloseCallback 
callback, final Object c
 
         factory.close(this);
         STATE_UPDATER.set(this, State.Closed);
+        executor.execute(() -> clearNotInitiatedPendingAddEntries(new 
ManagedLedgerException("Managed ledger is closed")));

Review Comment:
   In asyncClose(), the new failure passed to 
clearNotInitiatedPendingAddEntries() is a generic 
ManagedLedgerException("Managed ledger is closed"). This makes failures for 
queued-but-not-initiated adds inconsistent with other closed paths (eg 
internalAsyncAddEntry uses ManagedLedgerAlreadyClosedException) and can break 
callers that key off the specific subtype. Consider using 
ManagedLedgerAlreadyClosedException (or at least matching the existing close 
message pattern) here.
   ```suggestion
           executor.execute(() -> clearNotInitiatedPendingAddEntries(
                   new ManagedLedgerAlreadyClosedException("Managed ledger is 
closed")));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to