codelipenghui commented on code in PR #25240:
URL: https://github.com/apache/pulsar/pull/25240#discussion_r2891745220


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1751,20 +1753,32 @@ public void operationComplete(Void v, Stat stat) {
                         log.debug("[{}] Updating of ledgers list after create 
complete. version={}", name, stat);
                     }
                     ledgersStat = stat;
-                    synchronized (ManagedLedgerImpl.this) {
-                        LedgerHandle originalCurrentLedger = currentLedger;
-                        ledgers.put(lh.getId(), newLedger);
-                        currentLedger = lh;
-                        currentLedgerTimeoutTriggered = new AtomicBoolean();
-                        currentLedgerEntries = 0;
-                        currentLedgerSize = 0;
-                        updateLedgersIdsComplete(originalCurrentLedger);
-                        
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
-                                - lastLedgerCreationInitiationTimestamp, 
TimeUnit.MILLISECONDS);
-                        // May need to update the cursor position
-                        maybeUpdateCursorBeforeTrimmingConsumedLedger();
+                    // make sure that pendingAddEntries' operations are 
executed in the same thread
+                    // to avoid potential concurrent issues
+                    State state = STATE_UPDATER.get(ManagedLedgerImpl.this);
+                    if (state == State.Closed || state.isFenced()) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Skipping ledger update after 
create complete because ledger is "
+                                + "closed or fenced", name);
+                        }

Review Comment:
   [Critical] Two issues in this branch:
   
   1. **`metadataMutex` deadlock**: When this branch is taken, 
`metadataMutex.unlock()` is never called. The mutex was acquired in 
`updateLedgersListAfterRollover()` via `metadataMutex.tryLock()`. Since 
`CallbackMutex` is a `Semaphore(1)`, the permit is permanently consumed. All 
future metadata operations (trimming, offloading, etc.) that need this mutex 
will deadlock or silently skip.
   
   2. **LedgerHandle `lh` leak**: The newly created BookKeeper ledger handle 
`lh` is never closed in this branch. It's not assigned to `currentLedger` and 
not closed via `lh.closeAsync()`. This leaks a BK ledger handle. Note that 
`createComplete` (line 1712) only handles `State.Closed` with 
`lh.closeAsync()`, not fenced states, so the fenced path always reaches here.
   
   Suggested fix:
   ```java
   if (state == State.Closed || state.isFenced()) {
       metadataMutex.unlock();
       if (lh != null) {
           lh.closeAsync();
       }
       ...
   }
   ```



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java:
##########
@@ -764,6 +764,93 @@ public void invalidReadEntriesArg2() throws Exception {
         fail("Should have thrown an exception in the above line");
     }
 
+    @Test(timeOut = 30000)
+    public void testCloseManagedLedgerAfterRollover() throws Exception {
+        ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
+        config.setMaxCacheSize(0);
+        ManagedLedgerFactoryImpl factory = new 
ManagedLedgerFactoryImpl(metadataStore, bkc, config);

Review Comment:
   [Minor] This `factory` is never shut down. Consider adding 
`@Cleanup("shutdown")` as the second test (`testFencedManagedLedgerAfterAdd`) 
does for its factory. Without cleanup, the factory's internal executors leak.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -4418,11 +4447,17 @@ private void checkManagedLedgerIsOpen() throws 
ManagedLedgerException {
     public synchronized void setFenced() {
         log.info("{} Moving to Fenced state", name);
         STATE_UPDATER.set(this, State.Fenced);
+        executor.execute(() -> clearNotInitiatedPendingAddEntries(new 
ManagedLedgerFencedException("ManagedLedger "

Review Comment:
   [Minor] The `executor.execute()` call is not wrapped in try-catch for 
`RejectedExecutionException`. During shutdown, the executor may already be 
terminated, causing the cleanup to silently fail. Same applies to the other 
`executor.execute()` calls in `asyncClose`, `setFencedForDeletion`, and 
`operationComplete`.



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java:
##########
@@ -764,6 +764,93 @@ public void invalidReadEntriesArg2() throws Exception {
         fail("Should have thrown an exception in the above line");
     }
 
+    @Test(timeOut = 30000)
+    public void testCloseManagedLedgerAfterRollover() throws Exception {
+        ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
+        config.setMaxCacheSize(0);
+        ManagedLedgerFactoryImpl factory = new 
ManagedLedgerFactoryImpl(metadataStore, bkc, config);
+        ManagedLedgerImpl realLedger = (ManagedLedgerImpl) 
factory.open("my_test_ledger");
+        ManagedLedgerImpl ledger = Mockito.spy(realLedger);
+        AtomicBoolean onlyOnce = new AtomicBoolean(false);
+        when(ledger.currentLedgerIsFull()).thenAnswer(invocation -> 
onlyOnce.compareAndSet(false, true));
+        OpAddEntry realOp = OpAddEntry.createNoRetainBuffer(ledger,
+                    ByteBufAllocator.DEFAULT.buffer(128), null, null, new 
AtomicBoolean());
+        OpAddEntry op = spy(realOp);
+        CountDownLatch createLatch = new CountDownLatch(1);
+        CountDownLatch closeLatch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            // Simulate that before the rollover is completed, new write 
requests arrive,
+            // and after these write requests are added to pendingAddEntries, 
the ledger is closed.
+            log.info("before add, ledger state:{}", ledger.state);
+            for (int i = 0; i < 10; ++i) {
+                
ledger.internalAsyncAddEntry(OpAddEntry.createNoRetainBuffer(ledger,
+                        ByteBufAllocator.DEFAULT.buffer(128), null, null, new 
AtomicBoolean()));
+            }
+            ledger.asyncClose(new CloseCallback() {
+                @Override
+                public void closeComplete(Object ctx) {
+                    log.info("closeComplete finished, ledger state:{}", 
ledger.state);
+                    closeLatch.countDown();
+                }
+
+                @Override
+                public void closeFailed(ManagedLedgerException exception, 
Object ctx) {
+                    log.info("closeFailed, ex:{}, state:{}", 
exception.getMessage(), ledger.state);
+                    closeLatch.countDown();
+                }
+            }, null);
+            log.info("after add, ledger state:{}", ledger.state);
+            return invocationOnMock.callRealMethod();
+        }).when(ledger).asyncCreateLedger(any(), any(), any(), any(), any());
+        doAnswer(invocationOnMock -> {
+            Object o = invocationOnMock.callRealMethod();
+            log.info("createComplete finished, state:{}", ledger.state);
+            ledger.executor.execute(createLatch::countDown);
+            return o;
+        }).when(ledger).createComplete(anyInt(), any(), any());
+        ledger.internalAsyncAddEntry(op);
+        createLatch.await();
+        closeLatch.await();
+        Assert.assertEquals(ledger.pendingAddEntries.size(), 0);
+    }
+
+    @Test(timeOut = 20000)
+    public void testFencedManagedLedgerAfterAdd() throws Exception {
+        @Cleanup("shutdown")
+        ManagedLedgerFactory factory1 = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
+        ManagedLedgerImpl realLedger = (ManagedLedgerImpl) 
factory1.open("my_test_ledger");
+        ManagedLedgerImpl ledger = spy(realLedger);
+
+        int sendNum = 10;
+        CountDownLatch sendLatch = new CountDownLatch(sendNum);
+        CountDownLatch fencedLatch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            for (int i = 0; i < sendNum; ++i) {

Review Comment:
   [Minor] `stopBookKeeper()` and `stopMetadataStore()` are called inside the 
loop (10 times). They should be called once before the loop — subsequent calls 
are no-ops since `bkc` is already null after the first `stopBookKeeper()`.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1937,6 +1951,9 @@ synchronized void ledgerClosed(final LedgerHandle lh, 
Long lastAddConfirmed) {
             // The managed ledger was closed during the write operation
             clearPendingAddEntries(new 
ManagedLedgerAlreadyClosedException("Managed ledger was already closed"));
             return;

Review Comment:
   [Minor] This is a correct fix. Consider adding a log at WARN level here, 
similar to how other state transitions are logged. When all pending entries are 
cleared due to fencing, operators should have a trace in production logs to 
diagnose message loss:
   ```java
   } else if (state.isFenced()) {
       log.warn("[{}] Managed ledger is fenced during ledgerClosed, clearing {} 
pending add entries",
           name, pendingAddEntries.size());
       clearPendingAddEntries(new ManagedLedgerFencedException("Managed ledger 
is fenced"));
       return;
   }
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1751,20 +1753,32 @@ public void operationComplete(Void v, Stat stat) {
                         log.debug("[{}] Updating of ledgers list after create 
complete. version={}", name, stat);
                     }
                     ledgersStat = stat;
-                    synchronized (ManagedLedgerImpl.this) {
-                        LedgerHandle originalCurrentLedger = currentLedger;
-                        ledgers.put(lh.getId(), newLedger);
-                        currentLedger = lh;
-                        currentLedgerTimeoutTriggered = new AtomicBoolean();
-                        currentLedgerEntries = 0;
-                        currentLedgerSize = 0;
-                        updateLedgersIdsComplete(originalCurrentLedger);
-                        
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
-                                - lastLedgerCreationInitiationTimestamp, 
TimeUnit.MILLISECONDS);
-                        // May need to update the cursor position
-                        maybeUpdateCursorBeforeTrimmingConsumedLedger();
+                    // make sure that pendingAddEntries' operations are 
executed in the same thread
+                    // to avoid potential concurrent issues
+                    State state = STATE_UPDATER.get(ManagedLedgerImpl.this);

Review Comment:
   [Important] TOCTOU race: The state is read here *outside* 
`executor.execute()`, but the ledger update runs *inside* it asynchronously. 
Between this check and when the lambda executes, `asyncClose()` or 
`setFenced()` could change the state. The lambda would then proceed to call 
`updateLedgersIdsComplete()` which sets state to `LedgerOpened` and calls 
`initiate()` on pending ops — defeating the purpose of this fix.
   
   The original code didn't have this race because the `synchronized` block was 
entered immediately.
   
   Consider moving the state check inside the executor's synchronized block:
   ```java
   executor.execute(() -> {
       synchronized (ManagedLedgerImpl.this) {
           State state = STATE_UPDATER.get(ManagedLedgerImpl.this);
           if (state == State.Closed || state.isFenced()) {
               lh.closeAsync();
               metadataMutex.unlock();
               return;
           }
           // ... existing update logic ...
       }
       metadataMutex.unlock();
   });
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to