Denovo1998 commented on code in PR #25240:
URL: https://github.com/apache/pulsar/pull/25240#discussion_r2810380181


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1751,20 +1753,32 @@ public void operationComplete(Void v, Stat stat) {
                         log.debug("[{}] Updating of ledgers list after create 
complete. version={}", name, stat);
                     }
                     ledgersStat = stat;
-                    synchronized (ManagedLedgerImpl.this) {
-                        LedgerHandle originalCurrentLedger = currentLedger;
-                        ledgers.put(lh.getId(), newLedger);
-                        currentLedger = lh;
-                        currentLedgerTimeoutTriggered = new AtomicBoolean();
-                        currentLedgerEntries = 0;
-                        currentLedgerSize = 0;
-                        updateLedgersIdsComplete(originalCurrentLedger);
-                        
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
-                                - lastLedgerCreationInitiationTimestamp, 
TimeUnit.MILLISECONDS);
-                        // May need to update the cursor position
-                        maybeUpdateCursorBeforeTrimmingConsumedLedger();
+                    // make sure that pendingAddEntries' operations are 
executed in the same thread
+                    // to avoid potential concurrent issues
+                    State state = STATE_UPDATER.get(ManagedLedgerImpl.this);
+                    if (state == State.Closed || state.isFenced()) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Skipping ledger update after 
create complete because ledger is "
+                                + "closed or fenced", name);
+                        }
+                    } else {
+                        executor.execute(() -> {
+                            synchronized (ManagedLedgerImpl.this) {
+                                LedgerHandle originalCurrentLedger = 
currentLedger;
+                                ledgers.put(lh.getId(), newLedger);
+                                currentLedger = lh;
+                                currentLedgerTimeoutTriggered = new 
AtomicBoolean();
+                                currentLedgerEntries = 0;
+                                currentLedgerSize = 0;
+                                
updateLedgersIdsComplete(originalCurrentLedger);
+                                
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
+                                        - 
lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS);
+                                // May need to update the cursor position
+                                
maybeUpdateCursorBeforeTrimmingConsumedLedger();
+                            }
+                            metadataMutex.unlock();

Review Comment:
   Please re-check STATE_UPDATER inside the executor task and bail out if 
Closed/Fenced (while still releasing metadataMutex).



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2070,6 +2087,17 @@ void clearPendingAddEntries(ManagedLedgerException e) {
         }
     }
 
+    void clearNotInitiatedPendingAddEntries(ManagedLedgerException e) {
+        Iterator<OpAddEntry> iterator = pendingAddEntries.iterator();
+        while (iterator.hasNext()) {
+            OpAddEntry op = iterator.next();
+            if (op.closeIfNotInitiated()) {
+                op.failed(e);
+                iterator.remove();

Review Comment:
   Here, perhaps `iterator.remove()` should be called first to remove the op 
from the queue, and then `op.failed(e)`.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1751,20 +1753,32 @@ public void operationComplete(Void v, Stat stat) {
                         log.debug("[{}] Updating of ledgers list after create 
complete. version={}", name, stat);
                     }
                     ledgersStat = stat;
-                    synchronized (ManagedLedgerImpl.this) {
-                        LedgerHandle originalCurrentLedger = currentLedger;
-                        ledgers.put(lh.getId(), newLedger);
-                        currentLedger = lh;
-                        currentLedgerTimeoutTriggered = new AtomicBoolean();
-                        currentLedgerEntries = 0;
-                        currentLedgerSize = 0;
-                        updateLedgersIdsComplete(originalCurrentLedger);
-                        
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
-                                - lastLedgerCreationInitiationTimestamp, 
TimeUnit.MILLISECONDS);
-                        // May need to update the cursor position
-                        maybeUpdateCursorBeforeTrimmingConsumedLedger();
+                    // make sure that pendingAddEntries' operations are 
executed in the same thread
+                    // to avoid potential concurrent issues
+                    State state = STATE_UPDATER.get(ManagedLedgerImpl.this);
+                    if (state == State.Closed || state.isFenced()) {

Review Comment:
   The metadataMutex is acquired in updateLedgersListAfterRollover() and must 
always be released. In operationComplete(), the "Closed/Fenced" branch skips 
the ledger update but fails to call metadataMutex.unlock(), causing a lock leak 
that may deadlock subsequent metadata operations. Ensure unlock() is invoked in 
all branches, for example, by using a finally block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to