codelipenghui commented on code in PR #25240:
URL: https://github.com/apache/pulsar/pull/25240#discussion_r2891745220
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1751,20 +1753,32 @@ public void operationComplete(Void v, Stat stat) {
log.debug("[{}] Updating of ledgers list after create
complete. version={}", name, stat);
}
ledgersStat = stat;
- synchronized (ManagedLedgerImpl.this) {
- LedgerHandle originalCurrentLedger = currentLedger;
- ledgers.put(lh.getId(), newLedger);
- currentLedger = lh;
- currentLedgerTimeoutTriggered = new AtomicBoolean();
- currentLedgerEntries = 0;
- currentLedgerSize = 0;
- updateLedgersIdsComplete(originalCurrentLedger);
-
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
- - lastLedgerCreationInitiationTimestamp,
TimeUnit.MILLISECONDS);
- // May need to update the cursor position
- maybeUpdateCursorBeforeTrimmingConsumedLedger();
+ // make sure that pendingAddEntries' operations are
executed in the same thread
+ // to avoid potential concurrent issues
+ State state = STATE_UPDATER.get(ManagedLedgerImpl.this);
+ if (state == State.Closed || state.isFenced()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping ledger update after
create complete because ledger is "
+ + "closed or fenced", name);
+ }
Review Comment:
[Critical] Two issues in this branch:
1. **`metadataMutex` deadlock**: When this branch is taken,
`metadataMutex.unlock()` is never called. The mutex was acquired in
`updateLedgersListAfterRollover()` via `metadataMutex.tryLock()`. Since
`CallbackMutex` is a `Semaphore(1)`, the permit is permanently consumed. All
future metadata operations (trimming, offloading, etc.) that need this mutex
will deadlock or silently skip.
2. **LedgerHandle `lh` leak**: The newly created BookKeeper ledger handle
`lh` is never closed in this branch. It's not assigned to `currentLedger` and
not closed via `lh.closeAsync()`. This leaks a BK ledger handle. Note that
`createComplete` (line 1712) only handles `State.Closed` with
`lh.closeAsync()`, not fenced states, so the fenced path always reaches here.
Suggested fix:
```java
if (state == State.Closed || state.isFenced()) {
metadataMutex.unlock();
if (lh != null) {
lh.closeAsync();
}
...
}
```
##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java:
##########
@@ -764,6 +764,93 @@ public void invalidReadEntriesArg2() throws Exception {
fail("Should have thrown an exception in the above line");
}
+ @Test(timeOut = 30000)
+ public void testCloseManagedLedgerAfterRollover() throws Exception {
+ ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
+ config.setMaxCacheSize(0);
+ ManagedLedgerFactoryImpl factory = new
ManagedLedgerFactoryImpl(metadataStore, bkc, config);
Review Comment:
[Minor] This `factory` is never shut down. Consider adding
`@Cleanup("shutdown")` as the second test (`testFencedManagedLedgerAfterAdd`)
does for its factory. Without cleanup, the factory's internal executors leak.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -4418,11 +4447,17 @@ private void checkManagedLedgerIsOpen() throws
ManagedLedgerException {
public synchronized void setFenced() {
log.info("{} Moving to Fenced state", name);
STATE_UPDATER.set(this, State.Fenced);
+ executor.execute(() -> clearNotInitiatedPendingAddEntries(new
ManagedLedgerFencedException("ManagedLedger "
Review Comment:
[Minor] The `executor.execute()` call is not wrapped in try-catch for
`RejectedExecutionException`. During shutdown, the executor may already be
terminated, causing the cleanup to silently fail. Same applies to the other
`executor.execute()` calls in `asyncClose`, `setFencedForDeletion`, and
`operationComplete`.
##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java:
##########
@@ -764,6 +764,93 @@ public void invalidReadEntriesArg2() throws Exception {
fail("Should have thrown an exception in the above line");
}
+ @Test(timeOut = 30000)
+ public void testCloseManagedLedgerAfterRollover() throws Exception {
+ ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
+ config.setMaxCacheSize(0);
+ ManagedLedgerFactoryImpl factory = new
ManagedLedgerFactoryImpl(metadataStore, bkc, config);
+ ManagedLedgerImpl realLedger = (ManagedLedgerImpl)
factory.open("my_test_ledger");
+ ManagedLedgerImpl ledger = Mockito.spy(realLedger);
+ AtomicBoolean onlyOnce = new AtomicBoolean(false);
+ when(ledger.currentLedgerIsFull()).thenAnswer(invocation ->
onlyOnce.compareAndSet(false, true));
+ OpAddEntry realOp = OpAddEntry.createNoRetainBuffer(ledger,
+ ByteBufAllocator.DEFAULT.buffer(128), null, null, new
AtomicBoolean());
+ OpAddEntry op = spy(realOp);
+ CountDownLatch createLatch = new CountDownLatch(1);
+ CountDownLatch closeLatch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ // Simulate that before the rollover is completed, new write
requests arrive,
+ // and after these write requests are added to pendingAddEntries,
the ledger is closed.
+ log.info("before add, ledger state:{}", ledger.state);
+ for (int i = 0; i < 10; ++i) {
+
ledger.internalAsyncAddEntry(OpAddEntry.createNoRetainBuffer(ledger,
+ ByteBufAllocator.DEFAULT.buffer(128), null, null, new
AtomicBoolean()));
+ }
+ ledger.asyncClose(new CloseCallback() {
+ @Override
+ public void closeComplete(Object ctx) {
+ log.info("closeComplete finished, ledger state:{}",
ledger.state);
+ closeLatch.countDown();
+ }
+
+ @Override
+ public void closeFailed(ManagedLedgerException exception,
Object ctx) {
+ log.info("closeFailed, ex:{}, state:{}",
exception.getMessage(), ledger.state);
+ closeLatch.countDown();
+ }
+ }, null);
+ log.info("after add, ledger state:{}", ledger.state);
+ return invocationOnMock.callRealMethod();
+ }).when(ledger).asyncCreateLedger(any(), any(), any(), any(), any());
+ doAnswer(invocationOnMock -> {
+ Object o = invocationOnMock.callRealMethod();
+ log.info("createComplete finished, state:{}", ledger.state);
+ ledger.executor.execute(createLatch::countDown);
+ return o;
+ }).when(ledger).createComplete(anyInt(), any(), any());
+ ledger.internalAsyncAddEntry(op);
+ createLatch.await();
+ closeLatch.await();
+ Assert.assertEquals(ledger.pendingAddEntries.size(), 0);
+ }
+
+ @Test(timeOut = 20000)
+ public void testFencedManagedLedgerAfterAdd() throws Exception {
+ @Cleanup("shutdown")
+ ManagedLedgerFactory factory1 = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
+ ManagedLedgerImpl realLedger = (ManagedLedgerImpl)
factory1.open("my_test_ledger");
+ ManagedLedgerImpl ledger = spy(realLedger);
+
+ int sendNum = 10;
+ CountDownLatch sendLatch = new CountDownLatch(sendNum);
+ CountDownLatch fencedLatch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ for (int i = 0; i < sendNum; ++i) {
Review Comment:
[Minor] `stopBookKeeper()` and `stopMetadataStore()` are called inside the
loop (10 times). They should be called once before the loop — subsequent calls
are no-ops since `bkc` is already null after the first `stopBookKeeper()`.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1937,6 +1951,9 @@ synchronized void ledgerClosed(final LedgerHandle lh,
Long lastAddConfirmed) {
// The managed ledger was closed during the write operation
clearPendingAddEntries(new
ManagedLedgerAlreadyClosedException("Managed ledger was already closed"));
return;
Review Comment:
[Minor] This is a correct fix. Consider adding a log at WARN level here,
similar to how other state transitions are logged. When all pending entries are
cleared due to fencing, operators should have a trace in production logs to
diagnose message loss:
```java
} else if (state.isFenced()) {
log.warn("[{}] Managed ledger is fenced during ledgerClosed, clearing {}
pending add entries",
name, pendingAddEntries.size());
clearPendingAddEntries(new ManagedLedgerFencedException("Managed ledger
is fenced"));
return;
}
```
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1751,20 +1753,32 @@ public void operationComplete(Void v, Stat stat) {
log.debug("[{}] Updating of ledgers list after create
complete. version={}", name, stat);
}
ledgersStat = stat;
- synchronized (ManagedLedgerImpl.this) {
- LedgerHandle originalCurrentLedger = currentLedger;
- ledgers.put(lh.getId(), newLedger);
- currentLedger = lh;
- currentLedgerTimeoutTriggered = new AtomicBoolean();
- currentLedgerEntries = 0;
- currentLedgerSize = 0;
- updateLedgersIdsComplete(originalCurrentLedger);
-
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
- - lastLedgerCreationInitiationTimestamp,
TimeUnit.MILLISECONDS);
- // May need to update the cursor position
- maybeUpdateCursorBeforeTrimmingConsumedLedger();
+ // make sure that pendingAddEntries' operations are
executed in the same thread
+ // to avoid potential concurrent issues
+ State state = STATE_UPDATER.get(ManagedLedgerImpl.this);
Review Comment:
[Important] TOCTOU race: The state is read here *outside*
`executor.execute()`, but the ledger update runs *inside* it asynchronously.
Between this check and when the lambda executes, `asyncClose()` or
`setFenced()` could change the state. The lambda would then proceed to call
`updateLedgersIdsComplete()` which sets state to `LedgerOpened` and calls
`initiate()` on pending ops — defeating the purpose of this fix.
The original code didn't have this race because the `synchronized` block was
entered immediately.
Consider moving the state check inside the executor's synchronized block:
```java
executor.execute(() -> {
synchronized (ManagedLedgerImpl.this) {
State state = STATE_UPDATER.get(ManagedLedgerImpl.this);
if (state == State.Closed || state.isFenced()) {
lh.closeAsync();
metadataMutex.unlock();
return;
}
// ... existing update logic ...
}
metadataMutex.unlock();
});
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]