Denovo1998 commented on code in PR #25240:
URL: https://github.com/apache/pulsar/pull/25240#discussion_r2810380181
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1751,20 +1753,32 @@ public void operationComplete(Void v, Stat stat) {
log.debug("[{}] Updating of ledgers list after create
complete. version={}", name, stat);
}
ledgersStat = stat;
- synchronized (ManagedLedgerImpl.this) {
- LedgerHandle originalCurrentLedger = currentLedger;
- ledgers.put(lh.getId(), newLedger);
- currentLedger = lh;
- currentLedgerTimeoutTriggered = new AtomicBoolean();
- currentLedgerEntries = 0;
- currentLedgerSize = 0;
- updateLedgersIdsComplete(originalCurrentLedger);
-
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
- - lastLedgerCreationInitiationTimestamp,
TimeUnit.MILLISECONDS);
- // May need to update the cursor position
- maybeUpdateCursorBeforeTrimmingConsumedLedger();
+ // make sure that pendingAddEntries' operations are
executed in the same thread
+ // to avoid potential concurrent issues
+ State state = STATE_UPDATER.get(ManagedLedgerImpl.this);
+ if (state == State.Closed || state.isFenced()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping ledger update after
create complete because ledger is "
+ + "closed or fenced", name);
+ }
+ } else {
+ executor.execute(() -> {
+ synchronized (ManagedLedgerImpl.this) {
+ LedgerHandle originalCurrentLedger =
currentLedger;
+ ledgers.put(lh.getId(), newLedger);
+ currentLedger = lh;
+ currentLedgerTimeoutTriggered = new
AtomicBoolean();
+ currentLedgerEntries = 0;
+ currentLedgerSize = 0;
+
updateLedgersIdsComplete(originalCurrentLedger);
+
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
+ -
lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS);
+ // May need to update the cursor position
+
maybeUpdateCursorBeforeTrimmingConsumedLedger();
+ }
+ metadataMutex.unlock();
Review Comment:
Please re-check STATE_UPDATER inside the executor task and bail out if
Closed/Fenced (while still releasing metadataMutex).
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2070,6 +2087,17 @@ void clearPendingAddEntries(ManagedLedgerException e) {
}
}
+ void clearNotInitiatedPendingAddEntries(ManagedLedgerException e) {
+ Iterator<OpAddEntry> iterator = pendingAddEntries.iterator();
+ while (iterator.hasNext()) {
+ OpAddEntry op = iterator.next();
+ if (op.closeIfNotInitiated()) {
+ op.failed(e);
+ iterator.remove();
Review Comment:
Here, perhaps `iterator.remove()` should be called first to remove the op
from the queue, and then `op.failed(e)`.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1751,20 +1753,32 @@ public void operationComplete(Void v, Stat stat) {
log.debug("[{}] Updating of ledgers list after create
complete. version={}", name, stat);
}
ledgersStat = stat;
- synchronized (ManagedLedgerImpl.this) {
- LedgerHandle originalCurrentLedger = currentLedger;
- ledgers.put(lh.getId(), newLedger);
- currentLedger = lh;
- currentLedgerTimeoutTriggered = new AtomicBoolean();
- currentLedgerEntries = 0;
- currentLedgerSize = 0;
- updateLedgersIdsComplete(originalCurrentLedger);
-
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
- - lastLedgerCreationInitiationTimestamp,
TimeUnit.MILLISECONDS);
- // May need to update the cursor position
- maybeUpdateCursorBeforeTrimmingConsumedLedger();
+ // make sure that pendingAddEntries' operations are
executed in the same thread
+ // to avoid potential concurrent issues
+ State state = STATE_UPDATER.get(ManagedLedgerImpl.this);
+ if (state == State.Closed || state.isFenced()) {
Review Comment:
The metadataMutex is acquired in updateLedgersListAfterRollover() and must
always be released. In operationComplete(), the "Closed/Fenced" branch skips
the ledger update but fails to call metadataMutex.unlock(), causing a lock leak
that may deadlock subsequent metadata operations. Ensure unlock() is invoked in
all branches, for example, by using a finally block.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]