Copilot commented on code in PR #25240:
URL: https://github.com/apache/pulsar/pull/25240#discussion_r2793210568
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1751,20 +1752,24 @@ public void operationComplete(Void v, Stat stat) {
log.debug("[{}] Updating of ledgers list after create
complete. version={}", name, stat);
}
ledgersStat = stat;
- synchronized (ManagedLedgerImpl.this) {
- LedgerHandle originalCurrentLedger = currentLedger;
- ledgers.put(lh.getId(), newLedger);
- currentLedger = lh;
- currentLedgerTimeoutTriggered = new AtomicBoolean();
- currentLedgerEntries = 0;
- currentLedgerSize = 0;
- updateLedgersIdsComplete(originalCurrentLedger);
-
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
- - lastLedgerCreationInitiationTimestamp,
TimeUnit.MILLISECONDS);
- // May need to update the cursor position
- maybeUpdateCursorBeforeTrimmingConsumedLedger();
- }
- metadataMutex.unlock();
+ // make sure that pendingAddEntries' opeartions are
executed in the same thread
+ // to avoid potential concurrent issues
+ executor.execute(() -> {
+ synchronized (ManagedLedgerImpl.this) {
+ LedgerHandle originalCurrentLedger = currentLedger;
+ ledgers.put(lh.getId(), newLedger);
+ currentLedger = lh;
+ currentLedgerTimeoutTriggered = new
AtomicBoolean();
+ currentLedgerEntries = 0;
+ currentLedgerSize = 0;
+ updateLedgersIdsComplete(originalCurrentLedger);
+
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
+ - lastLedgerCreationInitiationTimestamp,
TimeUnit.MILLISECONDS);
+ // May need to update the cursor position
+ maybeUpdateCursorBeforeTrimmingConsumedLedger();
Review Comment:
The new executor.execute() block updates ledgers/currentLedger and calls
updateLedgersIdsComplete() without re-checking the ManagedLedger state. Since
this work is now deferred, it can run after asyncClose()/setFenced and
potentially transition the state back to LedgerOpened and/or touch ledgers
after closure. Consider checking STATE_UPDATER.get(this) for Closed or isFenced
inside the executor task and bailing out (while still releasing metadataMutex)
if the ledger is no longer writable.
```suggestion
// Re-check state after deferring to executor to
avoid mutating a closed/fenced ledger
if (STATE_UPDATER.get(ManagedLedgerImpl.this) ==
State.Closed || isFenced()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping ledger update
after create complete because ledger is "
+ "closed or fenced", name);
}
} else {
LedgerHandle originalCurrentLedger =
currentLedger;
ledgers.put(lh.getId(), newLedger);
currentLedger = lh;
currentLedgerTimeoutTriggered = new
AtomicBoolean();
currentLedgerEntries = 0;
currentLedgerSize = 0;
updateLedgersIdsComplete(originalCurrentLedger);
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
-
lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS);
// May need to update the cursor position
maybeUpdateCursorBeforeTrimmingConsumedLedger();
}
```
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2070,6 +2078,17 @@ void clearPendingAddEntries(ManagedLedgerException e) {
}
}
+ void clearNotInitiatedPendingAddEntries(ManagedLedgerException e) {
+ Iterator<OpAddEntry> iterator = pendingAddEntries.iterator();
+ while (iterator.hasNext()) {
+ OpAddEntry op = iterator.next();
+ if (op.closeIfNotInitiated()) {
+ op.failed(e);
+ iterator.remove();
+ }
+ }
Review Comment:
There are existing tests in this module for pendingAddEntries behavior, but
the new leak-prevention logic (clearNotInitiatedPendingAddEntries +
OpAddEntry.closeIfNotInitiated) is not covered. Please add tests for the two
scenarios described in the PR (close/fence occurring while ops are queued but
not yet initiated, and fenced write-failure path clearing subsequent queued
ops) to prevent regressions.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1937,6 +1942,9 @@ synchronized void ledgerClosed(final LedgerHandle lh,
Long lastAddConfirmed) {
// The managed ledger was closed during the write operation
clearPendingAddEntries(new
ManagedLedgerAlreadyClosedException("Managed ledger was already closed"));
return;
+ } else if (state == State.Fenced) {
+ clearPendingAddEntries(new ManagedLedgerFencedException("Managed
ledger is fenced"));
Review Comment:
ledgerClosed() only checks `state == State.Fenced`, but
`State.FencedForDeletion` also returns true for state.isFenced(). In that case
pending add entries would still not be cleared, which can reintroduce the leak
for fenced-for-deletion. Consider using `state.isFenced()` here (and keep the
exception message aligned with the specific fenced state if needed).
```suggestion
} else if (state.isFenced()) {
clearPendingAddEntries(new ManagedLedgerFencedException(
"Managed ledger is fenced (" + state + ")"));
```
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1751,20 +1752,24 @@ public void operationComplete(Void v, Stat stat) {
log.debug("[{}] Updating of ledgers list after create
complete. version={}", name, stat);
}
ledgersStat = stat;
- synchronized (ManagedLedgerImpl.this) {
- LedgerHandle originalCurrentLedger = currentLedger;
- ledgers.put(lh.getId(), newLedger);
- currentLedger = lh;
- currentLedgerTimeoutTriggered = new AtomicBoolean();
- currentLedgerEntries = 0;
- currentLedgerSize = 0;
- updateLedgersIdsComplete(originalCurrentLedger);
-
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
- - lastLedgerCreationInitiationTimestamp,
TimeUnit.MILLISECONDS);
- // May need to update the cursor position
- maybeUpdateCursorBeforeTrimmingConsumedLedger();
- }
- metadataMutex.unlock();
+ // make sure that pendingAddEntries' opeartions are
executed in the same thread
Review Comment:
The comment has a typo: "opeartions" → "operations".
```suggestion
// make sure that pendingAddEntries' operations are
executed in the same thread
```
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1652,6 +1652,7 @@ public synchronized void asyncClose(final CloseCallback
callback, final Object c
factory.close(this);
STATE_UPDATER.set(this, State.Closed);
+ executor.execute(() -> clearNotInitiatedPendingAddEntries(new
ManagedLedgerException("Managed ledger is closed")));
Review Comment:
In asyncClose(), the new failure passed to
clearNotInitiatedPendingAddEntries() is a generic
ManagedLedgerException("Managed ledger is closed"). This makes failures for
queued-but-not-initiated adds inconsistent with other closed paths (eg
internalAsyncAddEntry uses ManagedLedgerAlreadyClosedException) and can break
callers that key off the specific subtype. Consider using
ManagedLedgerAlreadyClosedException (or at least matching the existing close
message pattern) here.
```suggestion
executor.execute(() -> clearNotInitiatedPendingAddEntries(
new ManagedLedgerAlreadyClosedException("Managed ledger is
closed")));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]