This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 924042396bd [fix][broker] Fix ManagedCursor state management race conditions and lifecycle issues (#24569) 924042396bd is described below commit 924042396bd201447bd0c6bf55be84eda9fc000f Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Tue Jul 29 14:07:54 2025 +0300 [fix][broker] Fix ManagedCursor state management race conditions and lifecycle issues (#24569) (cherry picked from commit c96f27a4dde9e4211a5b83e91f5e384d7ac8d904) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 434 ++++++++++++--------- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 37 +- .../mledger/impl/NonDurableCursorImpl.java | 2 + .../bookkeeper/mledger/impl/OpReadEntry.java | 34 +- .../mledger/impl/ReadOnlyCursorImpl.java | 2 + .../service/persistent/PersistentSubscription.java | 10 + .../pulsar/broker/service/ReplicatorTest.java | 34 +- 7 files changed, 343 insertions(+), 210 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index fd7a7ce4e60..74b81153621 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -49,7 +49,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -156,17 +155,10 @@ public class ManagedCursorImpl implements ManagedCursor { MarkDeleteEntry.class, "lastMarkDeleteEntry"); protected volatile MarkDeleteEntry lastMarkDeleteEntry; - /** Protects the method "asyncReadEntriesWithSkipOrWait" and "cancelPendingReadRequest" runs concurrently. **/ - private final Object pendingReadOpMutex = new Object(); - /** - * 'ManagedLedger.notifyCursors' relies on this CAS to avoid using "pendingReadOpMutex" to guarantee thread-safety, - * which improved the performance of publishing messages. - */ protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, OpReadEntry> WAITING_READ_OP_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, OpReadEntry.class, "waitingReadOp"); @SuppressWarnings("unused") private volatile OpReadEntry waitingReadOp = null; - private DelayCheckForNewEntriesTask delayCheckForNewEntriesTask; public static final int FALSE = 0; public static final int TRUE = 1; @@ -238,6 +230,11 @@ public class ManagedCursorImpl implements ManagedCursor { // active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger. private volatile boolean isActive = false; + // This is a lock used to update the registration state of the cursor in the managed ledger. + private final Object registerToWaitingCursorsLock = new Object(); + // This is used to track if the cursor is registered in the managed ledger's waitingCursors queue + boolean registeredToWaitingCursors = false; + class MarkDeleteEntry { final PositionImpl newPosition; final MarkDeleteCallback callback; @@ -295,17 +292,38 @@ public class ManagedCursorImpl implements ManagedCursor { private volatile long lastActive; public enum State { - Uninitialized, // Cursor is being initialized - NoLedger, // There is no metadata ledger open for writing - Open, // Metadata ledger is ready - SwitchingLedger, // The metadata ledger is being switched - Closing, // The managed cursor is closing - Closed // The managed cursor has been closed + Uninitialized(false), // Cursor is being initialized + NoLedger(false), // There is no metadata ledger open for writing + Open(false), // Metadata ledger is ready + SwitchingLedger(false), // The metadata ledger is being switched + Closing(true), // The managed cursor is closing + Closed(true), // The managed cursor has been closed + Deleting(true), // The managed cursor is being deleted + Deleted(true), // The managed cursor has been deleted + DeletingFailed(true); // The managed cursor deletion failed, state allows retrying deletion. + + // Indicate if the cursor is in a state that is considered closed + private final boolean closedState; + + State(boolean closedState) { + this.closedState = closedState; + } + + /** + * Returns true if the state is considered closed. + */ + public boolean isClosed() { + return closedState; + } + + public boolean isDeletingOrDeleted() { + return this == Deleting || this == Deleted; + } } protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, State> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, State.class, "state"); - protected volatile State state = null; + protected volatile State state = State.Uninitialized; protected final ManagedCursorMXBean mbean; @@ -329,7 +347,6 @@ public class ManagedCursorImpl implements ManagedCursor { this.batchDeletedIndexes = null; } this.digestType = BookKeeper.DigestType.fromApiDigestType(getConfig().getDigestType()); - STATE_UPDATER.set(this, State.Uninitialized); PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.set(this, 0); PENDING_READ_OPS_UPDATER.set(this, 0); RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE); @@ -787,7 +804,23 @@ public class ManagedCursorImpl implements ManagedCursor { // assign cursor-ledger so, it can be deleted when new ledger will be switched this.cursorLedger = recoveredFromCursorLedger; this.isCursorLedgerReadOnly = true; - STATE_UPDATER.set(this, State.NoLedger); + changeStateIfNotClosed(State.NoLedger); + } + + /** + * Change the state of the cursor if it is not already considered closed. + * This is to prevent invalid state transitions when the cursor is already closed. + * + * @param newState The new state to set + * @return The previous state of the cursor + */ + private State changeStateIfNotClosed(State newState) { + return STATE_UPDATER.getAndUpdate(this, current -> { + if (current.isClosed()) { + return current; + } + return newState; + }); } void initialize(PositionImpl position, Map<String, Long> properties, Map<String, String> cursorProperties, @@ -801,7 +834,7 @@ public class ManagedCursorImpl implements ManagedCursor { new MetaStoreCallback<>() { @Override public void operationComplete(Void result, Stat stat) { - STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger); + changeStateIfNotClosed(State.NoLedger); callback.operationComplete(); } @Override @@ -1036,87 +1069,36 @@ public class ManagedCursorImpl implements ManagedCursor { skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted); OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback, ctx, maxPosition, skipCondition); - - synchronized (pendingReadOpMutex) { - if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) { - op.recycle(); - callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx); - return; - } - - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Deferring retry of read at position {}", ledger.getName(), name, - op.readPosition); - } - - // Check again for new entries after the configured time, then if still no entries are available - // register to be notified. - if (getConfig().getNewEntriesCheckDelayInMillis() > 0) { - delayCheckForNewEntriesTask = new DelayCheckForNewEntriesTask(op, callback, ctx); - } else { - // If there's no delay, check directly from the same thread - checkForNewEntries(op, callback, ctx); - } + int opReadId = op.id; + if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) { + op.recycle(); + callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx); + return; } - } - } - - private enum DelayCheckForNewEntriesTaskState { - INIT, RUNNING, CANCELLED, DONE - } - - private class DelayCheckForNewEntriesTask implements Runnable { - - private final OpReadEntry op; - private final ReadEntriesCallback callback; - private final Object ctx; - private final ScheduledFuture<?> scheduledFuture; - private DelayCheckForNewEntriesTaskState state = DelayCheckForNewEntriesTaskState.INIT; - public DelayCheckForNewEntriesTask(OpReadEntry op, ReadEntriesCallback callback, Object ctx) { - this.op = op; - this.callback = callback; - this.ctx = ctx; - scheduledFuture = ledger.getScheduledExecutor().schedule(this, - getConfig().getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS); - } - - @Override - public void run() { - synchronized (pendingReadOpMutex) { - if (state != DelayCheckForNewEntriesTaskState.INIT) { - return; - } - state = DelayCheckForNewEntriesTaskState.RUNNING; - checkForNewEntries(op, callback, ctx); - state = DelayCheckForNewEntriesTaskState.DONE; + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Deferring retry of read at position {}", ledger.getName(), name, op.readPosition); } - } - - public boolean isDone() { - return state == DelayCheckForNewEntriesTaskState.DONE; - } - public boolean cancel() { - synchronized (pendingReadOpMutex) { - // Not all implementations of Executor guarantee that the Runnable will be no long be executed after a - // successful cancel, such as Guava MoreExecutors, see also https://github.com/google/guava/blob - // /v32.1.2/guava/src/com/google/common/util/concurrent/MoreExecutors.java#L709. - // The current task guarantees. - if (state != DelayCheckForNewEntriesTaskState.INIT) { - return false; - } - state = DelayCheckForNewEntriesTaskState.CANCELLED; - scheduledFuture.cancel(false); - return true; + // Check again for new entries after the configured time, then if still no entries are available register + // to be notified + if (getConfig().getNewEntriesCheckDelayInMillis() > 0) { + ledger.getScheduledExecutor().schedule(() -> checkForNewEntries(opReadId, op, callback, ctx), + getConfig().getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS); + } else { + // If there's no delay, check directly from the same thread + checkForNewEntries(opReadId, op, callback, ctx); } } } - private void checkForNewEntries(OpReadEntry op, ReadEntriesCallback callback, Object ctx) { + // Please notice that OpReadEntry might be recycled due to sharing via waitingReadOp field logic + // That's why the fields cannot be accessed before the reference is removed from waitingReadOp atomically + // and the id matches the removed reference. + private void checkForNewEntries(int opReadId, OpReadEntry op, ReadEntriesCallback callback, Object ctx) { try { if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Re-trying the read at position {}", ledger.getName(), name, op.readPosition); + log.debug("[{}] [{}] Re-trying the read for op id {}", ledger.getName(), name, opReadId); } if (isClosed()) { @@ -1145,17 +1127,33 @@ public class ManagedCursorImpl implements ManagedCursor { log.debug("[{}] [{}] Found more entries", ledger.getName(), name); } // Try to cancel the notification request - if (WAITING_READ_OP_UPDATER.compareAndSet(this, op, null)) { - ledger.removeWaitingCursor(this); + // Clear the waiting read op only if it matches the current instance and the id matches + // the opReadId parameter. This avoids recycled OpReadEntry instances from matching since their + // ids would be different after recycling. + OpReadEntry waitingReadOpItem = WAITING_READ_OP_UPDATER.getAndUpdate(this, + current -> { + if (current == op && current.id == opReadId) { + // update the value to null to cancel the waiting read op + return null; + } else { + // keep the current waiting read op value + return current; + } + }); + // If the waiting read op was the same as the one we are trying to cancel, it means that it was now + // cleared from the waitingReadOp field and therefore "cancelled" + if (waitingReadOpItem == op && waitingReadOpItem.id == opReadId) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Cancelled notification and scheduled read at {}", ledger.getName(), - name, op.readPosition); + name, op.readPosition); } PENDING_READ_OPS_UPDATER.incrementAndGet(this); ledger.asyncReadEntries(op); } else { - log.info("[{}] [{}] notification that new entries added was already be called, skipped the current" - + " new entry checking", ledger.getName(), name); + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] notification was already cancelled for op id {}", ledger.getName(), name, + opReadId); + } } } else if (ledger.isTerminated()) { // At this point we registered for notification and still there were no more available @@ -1170,7 +1168,7 @@ public class ManagedCursorImpl implements ManagedCursor { @Override public boolean isClosed() { - return state == State.Closed || state == State.Closing; + return state.isClosed(); } @Override @@ -1178,53 +1176,21 @@ public class ManagedCursorImpl implements ManagedCursor { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Cancel pending read request", ledger.getName(), name); } - synchronized (pendingReadOpMutex) { - final OpReadEntry op = WAITING_READ_OP_UPDATER.get(this); - // Case 1: the pending read has executed, or there is no pending read. - if (op == null) { - return false; - } - Function<Boolean, Boolean> clearWaitingReadOp = removeWaitingCursor -> { - if (WAITING_READ_OP_UPDATER.compareAndSet(this, op, null)) { - if (removeWaitingCursor) { - ledger.removeWaitingCursor(this); - } - op.recycle(); - return true; - } else { - // Managed ledger has noticed that new entries was added. - if (WAITING_READ_OP_UPDATER.get(this) == null) { - return false; - } - // It will never occur, it means the lock "pendingReadOpMutex" does not work as expected, which - // allowed other thread to modify the "waitingReadOp" concurrently. - // The waitingReadOp has been modified to other instance, which will never occur. - log.warn("[{}] [{}] Cancel pending request encountered an unexpected error, the lock" - + " \"pendingReadOpMutex\" does not work as expected, which allowed other" - + " thread to modify the \"waitingReadOp\" concurrently..", ledger.getName(), name); - return cancelPendingReadRequest(); - } - }; - // There is a pending read, - // Case 2: delayCheckForNewEntriesTask can be cancelled, no need to remove cursor from "ml.waitingCursors", - // because it has not added successfully yet. - if (delayCheckForNewEntriesTask != null && delayCheckForNewEntriesTask.cancel()) { - return clearWaitingReadOp.apply(false); - } - // Case 3: managedLedgerNewEntriesCheckDelayInMillis is "0". - // Case 4: delayCheckForNewEntriesTask has done, which has added cursor into "ml.waitingCursors". - if (delayCheckForNewEntriesTask == null || delayCheckForNewEntriesTask.isDone()) { - return clearWaitingReadOp.apply(true); + final OpReadEntry op = WAITING_READ_OP_UPDATER.getAndUpdate(this, current -> { + if (current == OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR) { + return current; } - // Case 5: delayCheckForNewEntriesTask is running, but not done, which only occurs at a corner case. It - // only happens when the task is starting. Calling "cancelPendingReadRequest" here will release the lock - // "pendingReadOpMutex" and let the task go ahead. - return cancelPendingReadRequest(); + return null; + }); + if (op != null) { + op.recycle(); } + return op != null && op != OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR; } public boolean hasPendingReadRequest() { - return WAITING_READ_OP_UPDATER.get(this) != null; + OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER.get(this); + return opReadEntry != null && opReadEntry != OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR; } @Override @@ -2242,7 +2208,7 @@ public class ManagedCursorImpl implements ManagedCursor { // We cannot write to the ledger during the switch, need to wait until the new metadata ledger is available synchronized (pendingMarkDeleteOps) { // The state might have changed while we were waiting on the queue mutex - switch (STATE_UPDATER.get(this)) { + switch (state) { case Closed: callback.markDeleteFailed(new ManagedLedgerException .CursorAlreadyClosedException("Cursor was already closed"), ctx); @@ -2373,14 +2339,14 @@ public class ManagedCursorImpl implements ManagedCursor { } }; - if (State.NoLedger.equals(STATE_UPDATER.get(this))) { + if (state == State.NoLedger) { if (ledger.isNoMessagesAfterPos(mdEntry.newPosition)) { persistPositionToMetaStore(mdEntry, cb); } else { cb.operationFailed(new ManagedLedgerException("Switch new cursor ledger failed")); } } else { - persistPositionToLedger(cursorLedger, mdEntry, cb); + persistPositionToLedger(cursorLedger, mdEntry, cb, false); } } @@ -2825,7 +2791,7 @@ public class ManagedCursorImpl implements ManagedCursor { ledger.getName(), name, cursorLedger.getId(), e.getMessage()); callback.closeFailed(e, ctx); } - }); + }, true); } else { persistPositionMetaStore(-1, position, properties, new MetaStoreCallback<Void>() { @Override @@ -2952,12 +2918,18 @@ public class ManagedCursorImpl implements ManagedCursor { callback.closeComplete(ctx); return; } + closeWaitingCursor(); + setInactive(); persistPositionWhenClosing(lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties, new AsyncCallbacks.CloseCallback(){ @Override public void closeComplete(Object ctx) { - STATE_UPDATER.set(ManagedCursorImpl.this, State.Closed); + if (!STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Closing, State.Closed)) { + log.warn("[{}] [{}] State was modified from closing to {} while closing", ledger.getName(), + name, state); + state = State.Closed; + } callback.closeComplete(ctx); } @@ -2970,6 +2942,19 @@ public class ManagedCursorImpl implements ManagedCursor { }, ctx); } + protected void closeWaitingCursor() { + synchronized (registerToWaitingCursorsLock) { + if (registeredToWaitingCursors) { + ledger.removeWaitingCursor(this); + } + } + OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER.getAndSet(this, + OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR); + if (opReadEntry != null && opReadEntry != OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR) { + opReadEntry.readEntriesFailed(new CursorAlreadyClosedException("Cursor is closing"), opReadEntry.ctx); + } + } + /** * Internal version of seek that doesn't do the validation check. * @@ -3021,8 +3006,8 @@ public class ManagedCursorImpl implements ManagedCursor { void startCreatingNewMetadataLedger() { // Change the state so that new mark-delete ops will be queued and not immediately submitted - State oldState = STATE_UPDATER.getAndSet(this, State.SwitchingLedger); - if (oldState == State.SwitchingLedger) { + State oldState = changeStateIfNotClosed(State.SwitchingLedger); + if (oldState == State.SwitchingLedger || oldState.isClosed()) { // Ignore double request return; } @@ -3042,7 +3027,7 @@ public class ManagedCursorImpl implements ManagedCursor { flushPendingMarkDeletes(); // Resume normal mark-delete operations - STATE_UPDATER.set(ManagedCursorImpl.this, State.Open); + changeStateIfNotClosed(State.Open); } } @@ -3053,7 +3038,7 @@ public class ManagedCursorImpl implements ManagedCursor { synchronized (pendingMarkDeleteOps) { // At this point we don't have a ledger ready - STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger); + changeStateIfNotClosed(State.NoLedger); // There are two case may cause switch ledger fails. // 1. No enough BKs; BKs are in read-only mode... // 2. Write ZK fails. @@ -3077,21 +3062,8 @@ public class ManagedCursorImpl implements ManagedCursor { * @return false if the {@link #state} already is {@link State#Closing} or {@link State#Closed}. */ private boolean trySetStateToClosing() { - final AtomicBoolean notClosing = new AtomicBoolean(false); - STATE_UPDATER.updateAndGet(this, state -> { - switch (state){ - case Closing: - case Closed: { - notClosing.set(false); - return state; - } - default: { - notClosing.set(true); - return State.Closing; - } - } - }); - return notClosing.get(); + State previousState = changeStateIfNotClosed(State.Closing); + return !previousState.isClosed(); } private void flushPendingMarkDeletes() { @@ -3134,7 +3106,7 @@ public class ManagedCursorImpl implements ManagedCursor { deleteLedgerAsync(newLedgerHandle); callback.operationFailed(exception); } - }); + }, false); }).whenComplete((result, e) -> { ledger.mbean.endCursorLedgerCreateOp(); if (e != null) { @@ -3293,7 +3265,8 @@ public class ManagedCursorImpl implements ManagedCursor { } } - void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) { + void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback, + boolean ignoreClosedStateAfterFailure) { PositionImpl position = mdEntry.newPosition; Builder piBuilder = PositionInfo.newBuilder().setLedgerId(position.getLedgerId()) .setEntryId(position.getEntryId()) @@ -3347,7 +3320,7 @@ public class ManagedCursorImpl implements ManagedCursor { mbean.addWriteCursorLedgerSize(data.length); callback.operationComplete(); } else { - if (state == State.Closed) { + if (!ignoreClosedStateAfterFailure && state.isClosed()) { // After closed the cursor, the in-progress persistence task will get a // BKException.Code.LedgerClosedException. callback.operationFailed(new CursorAlreadyClosedException(String.format("%s %s skipped this" @@ -3368,8 +3341,7 @@ public class ManagedCursorImpl implements ManagedCursor { public boolean periodicRollover() { LedgerHandle lh = cursorLedger; - if (State.Open.equals(STATE_UPDATER.get(this)) - && lh != null && lh.getLength() > 0) { + if (state == State.Open && lh != null && lh.getLength() > 0) { boolean triggered = rolloverLedgerIfNeeded(lh); if (triggered) { log.info("[{}] Periodic rollover triggered for cursor {} (length={} bytes)", @@ -3427,7 +3399,7 @@ public class ManagedCursorImpl implements ManagedCursor { if (ledger.getFactory().isMetadataServiceAvailable() && (lh.getLastAddConfirmed() >= getConfig().getMetadataMaxEntriesPerLedger() || lastLedgerSwitchTimestamp < (now - getConfig().getLedgerRolloverTimeout() * 1000)) - && (STATE_UPDATER.get(this) != State.Closed && STATE_UPDATER.get(this) != State.Closing)) { + && !state.isClosed()) { // It's safe to modify the timestamp since this method will be only called from a callback, implying that // calls will be serialized on one single thread lastLedgerSwitchTimestamp = now; @@ -3475,7 +3447,21 @@ public class ManagedCursorImpl implements ManagedCursor { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Received ml notification", ledger.getName(), name); } - OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER.getAndSet(this, null); + + OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER.getAndUpdate(this, current -> { + // if the waitingReadOp is WAITING_READ_OP_FOR_CLOSED_CURSOR, keep it as is + if (current == OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR) { + return current; + } else { + // Otherwise, clear the waiting read operation + return null; + } + }); + + // ignore the notification if the cursor is already closed + if (opReadEntry == OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR) { + return; + } if (opReadEntry != null) { if (log.isDebugEnabled()) { @@ -3484,7 +3470,15 @@ public class ManagedCursorImpl implements ManagedCursor { log.debug("[{}] Consumer {} cursor notification: other counters: consumed {} mdPos {} rdPos {}", ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition); } - + if (isClosed()) { + // If the cursor is closed, we should not read any more entries + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Cursor is already closed, ignoring notification", ledger.getName(), name); + } + opReadEntry.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException( + "Cursor was already closed"), opReadEntry.ctx); + return; + } PENDING_READ_OPS_UPDATER.incrementAndGet(this); opReadEntry.readPosition = (PositionImpl) getReadPosition(); ledger.asyncReadEntries(opReadEntry); @@ -3519,7 +3513,6 @@ public class ManagedCursorImpl implements ManagedCursor { void decrementPendingMarkDeleteCount() { if (PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.decrementAndGet(this) == 0) { - final State state = STATE_UPDATER.get(this); if (state == State.SwitchingLedger) { // A metadata ledger switch was pending and now we can do it since we don't have any more // outstanding mark-delete requests @@ -3531,7 +3524,7 @@ public class ManagedCursorImpl implements ManagedCursor { void readOperationCompleted() { if (PENDING_READ_OPS_UPDATER.decrementAndGet(this) == 0) { synchronized (pendingMarkDeleteOps) { - if (STATE_UPDATER.get(this) == State.Open) { + if (state == State.Open) { // Flush the pending writes only if the state is open. flushPendingMarkDeletes(); } else if (PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.get(this) != 0) { @@ -3578,13 +3571,25 @@ public class ManagedCursorImpl implements ManagedCursor { } private void asyncDeleteCursorLedger(int retry) { - STATE_UPDATER.set(this, State.Closed); + State beforeChangingState = changeStateToDeletingIfNotDeleted(); + if (beforeChangingState == State.Deleted) { + log.warn("[{}-{}] Cursor ledger is already deleted. state={}", ledger.getName(), name, + beforeChangingState); + return; + } - if (cursorLedger == null || retry <= 0) { - if (cursorLedger != null) { - log.warn("[{}-{}] Failed to delete ledger after retries {}", ledger.getName(), name, - cursorLedger.getId()); - } + closeWaitingCursor(); + + if (cursorLedger == null) { + log.warn("[{}-{}] There's no cursor ledger available for deletion.", ledger.getName(), name); + state = State.DeletingFailed; + return; + } + + if (retry <= 0) { + log.warn("[{}-{}] Failed to delete ledger after retries {}", ledger.getName(), name, + cursorLedger.getId()); + state = State.DeletingFailed; return; } @@ -3592,18 +3597,36 @@ public class ManagedCursorImpl implements ManagedCursor { bookkeeper.asyncDeleteLedger(cursorLedger.getId(), (rc, ctx) -> { ledger.mbean.endCursorLedgerDeleteOp(); if (rc == BKException.Code.OK) { + state = State.Deleted; log.info("[{}][{}] Deleted cursor ledger {}", ledger.getName(), name, cursorLedger.getId()); } else { log.warn("[{}][{}] Failed to delete ledger {}: {}", ledger.getName(), name, cursorLedger.getId(), BKException.getMessage(rc)); if (!isNoSuchLedgerExistsException(rc)) { + state = State.DeletingFailed; ledger.getScheduledExecutor().schedule(() -> asyncDeleteCursorLedger(retry - 1), DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS); + } else { + state = State.Deleted; } } }, null); } + /** + * Change the state to {@link State#Deleting} if the current state is not {@link State#Deleted}. + * @return The state before changing. + */ + State changeStateToDeletingIfNotDeleted() { + return STATE_UPDATER.getAndUpdate(this, current -> { + // don't change the state if it's already deleted + if (current == State.Deleted) { + return current; + } + return State.Deleting; + }); + } + /** * return BK error codes that are considered not likely to be recoverable. */ @@ -3750,7 +3773,7 @@ public class ManagedCursorImpl implements ManagedCursor { } public String getState() { - return STATE_UPDATER.get(this).toString(); + return state.toString(); } @Override @@ -3872,8 +3895,8 @@ public class ManagedCursorImpl implements ManagedCursor { } @VisibleForTesting - public void setState(State state) { - this.state = state; + public State getAndSetState(State state) { + return STATE_UPDATER.getAndSet(this, state); } public void setCacheReadEntry(boolean cacheReadEntry) { @@ -3919,4 +3942,55 @@ public class ManagedCursorImpl implements ManagedCursor { } return newNonDurableCursor; } + + /** + * Called by ManagedLedgerImpl to execute the Runnable inside the lock to remove the cursor from it's + * waiting cursors list. + * The cursor state is set to unregistered, and it can be registered again for waiting in ManagedLedgerImpl. + */ + void removeWaitingCursorRequested(Runnable removeWaitingCursorRunnable) { + synchronized (registerToWaitingCursorsLock) { + if (!registeredToWaitingCursors) { + // The cursor hasn't been registered, do not attempt to remove + if (log.isDebugEnabled()) { + log.debug("[{}] Skipping removing cursor {} from waiting cursors since it's not registered.", + ledger.getName(), name); + } + return; + } + if (log.isDebugEnabled()) { + log.debug("[{}] Removing cursor {} from waiting cursors", ledger.getName(), name); + } + removeWaitingCursorRunnable.run(); + registeredToWaitingCursors = false; + } + } + + /** + * Called by ManagedLedgerImpl to notify that the cursor has been dequeued from the waiting cursors list. + */ + void notifyWaitingCursorDequeued() { + synchronized (registerToWaitingCursorsLock) { + registeredToWaitingCursors = false; + } + } + + /** + * Called by ManagedLedgerImpl to execute the Runnable inside the lock to remove the cursor from it's + * waiting cursors list. + * This method is used to ensure that the cursor is not already registered, resulting in duplicates. + */ + void addWaitingCursorRequested(Runnable addWaitingCursorRunnable) { + synchronized (registerToWaitingCursorsLock) { + if (registeredToWaitingCursors || isClosed()) { + // The cursor is already registered or closed, do not register again. + return; + } + if (log.isDebugEnabled()) { + log.debug("[{}] Adding cursor {} to waiting cursors", ledger.getName(), name); + } + addWaitingCursorRunnable.run(); + registeredToWaitingCursors = true; + } + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 80a070287de..7a82087503f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1053,15 +1053,29 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { callback.deleteCursorFailed(new ManagedLedgerException.CursorNotFoundException("ManagedCursor not found: " + consumerName), ctx); return; - } else if (!cursor.isDurable()) { - cursor.setState(ManagedCursorImpl.State.Closed); - cursor.cancelPendingReadRequest(); + } + + // Non-durable cursors can be closed and removed immediately + if (!cursor.isDurable()) { + try { + cursor.close(); + } catch (Exception e) { + log.warn("[{}] Failed to close non-durable cursor {}", name, consumerName, e); + } cursors.removeCursor(consumerName); - deactivateCursorByName(consumerName); callback.deleteCursorComplete(ctx); return; } + // If the cursor is active, we need to deactivate it first + cursor.setInactive(); + // Set the state to deleting (which is a closed state) to avoid any new writes + ManagedCursorImpl.State beforeChangingState = cursor.changeStateToDeletingIfNotDeleted(); + if (beforeChangingState.isDeletingOrDeleted()) { + log.warn("[{}] [{}] Cursor is already being deleted or has been deleted.", name, consumerName); + return; + } + // First remove the consumer form the MetaStore. If this operation succeeds and the next one (removing the // ledger from BK) don't, we end up having a loose ledger leaked but the state will be consistent. store.asyncRemoveCursor(ManagedLedgerImpl.this.name, consumerName, new MetaStoreCallback<Void>() { @@ -1069,7 +1083,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { public void operationComplete(Void result, Stat stat) { cursor.asyncDeleteCursorLedger(); cursors.removeCursor(consumerName); - deactivateCursorByName(consumerName); trimConsumedLedgersInBackground(); @@ -1079,7 +1092,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public void operationFailed(MetaStoreException e) { - handleBadVersion(e); + cursor.getAndSetState(ManagedCursorImpl.State.DeletingFailed); callback.deleteCursorFailed(e, ctx); } @@ -2441,7 +2454,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (waitingCursor == null) { break; } - + waitingCursor.notifyWaitingCursorDequeued(); executor.execute(waitingCursor::notifyEntriesAvailable); } } @@ -3939,11 +3952,17 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { public void removeWaitingCursor(ManagedCursor cursor) { - this.waitingCursors.remove(cursor); + ((ManagedCursorImpl) cursor).removeWaitingCursorRequested(() -> { + // remove only if the cursor has been registered + this.waitingCursors.remove(cursor); + }); } public void addWaitingCursor(ManagedCursorImpl cursor) { - this.waitingCursors.add(cursor); + cursor.addWaitingCursorRequested(() -> { + // add only if the cursor has not been registered + this.waitingCursors.add(cursor); + }); } public boolean isCursorActive(ManagedCursor cursor) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java index 734eab20bc5..7dbf5400da1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java @@ -111,6 +111,8 @@ public class NonDurableCursorImpl extends ManagedCursorImpl { @Override public void asyncClose(CloseCallback callback, Object ctx) { STATE_UPDATER.set(this, State.Closed); + closeWaitingCursor(); + setInactive(); callback.closeComplete(ctx); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index a250d259553..20c7742129b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -22,6 +22,7 @@ import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; @@ -33,7 +34,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; class OpReadEntry implements ReadEntriesCallback { - + static final OpReadEntry WAITING_READ_OP_FOR_CLOSED_CURSOR = new OpReadEntry(); + private static final AtomicInteger opReadIdGenerator = new AtomicInteger(1); + /** + * id for this read operation. Value can be negative when integer value overflow happens. + * Used for waitingReadOp consistency so the the correct instance is handled after the instance has already been + * recycled. + */ + int id; ManagedCursorImpl cursor; PositionImpl readPosition; private int count; @@ -50,6 +58,7 @@ class OpReadEntry implements ReadEntriesCallback { public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPositionRef, int count, ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) { OpReadEntry op = RECYCLER.get(); + op.id = opReadIdGenerator.getAndIncrement(); op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef); op.cursor = cursor; op.count = count; @@ -122,7 +131,7 @@ class OpReadEntry implements ReadEntriesCallback { if (!entries.isEmpty()) { // There were already some entries that were read before, we can return them complete(ctx); - } else if (cursor.getConfig().isAutoSkipNonRecoverableData() + } else if (!cursor.isClosed() && cursor.getConfig().isAutoSkipNonRecoverableData() && exception instanceof NonRecoverableLedgerException) { log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), cursor.getName(), readPosition, exception.getMessage()); @@ -197,6 +206,22 @@ class OpReadEntry implements ReadEntriesCallback { this.recyclerHandle = recyclerHandle; } + // no-op constructor for EMPTY instance + private OpReadEntry() { + this.recyclerHandle = null; + this.callback = new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List<Entry> entries, Object ctx) { + // no-op + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + // no-op + } + }; + } + private static final Recycler<OpReadEntry> RECYCLER = new Recycler<OpReadEntry>() { @Override protected OpReadEntry newObject(Recycler.Handle<OpReadEntry> recyclerHandle) { @@ -205,6 +230,11 @@ class OpReadEntry implements ReadEntriesCallback { }; public void recycle() { + if (recyclerHandle == null) { + // This is the no-op instance, do not recycle + return; + } + id = -1; count = 0; cursor = null; readPosition = null; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java index 2461bcf780e..0ad00e91d10 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java @@ -59,6 +59,8 @@ public class ReadOnlyCursorImpl extends ManagedCursorImpl implements ReadOnlyCur @Override public void asyncClose(final AsyncCallbacks.CloseCallback callback, final Object ctx) { state = State.Closed; + closeWaitingCursor(); + setInactive(); callback.closeComplete(ctx); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 635c0beb02d..86c4251f371 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -327,6 +327,16 @@ public class PersistentSubscription extends AbstractSubscription { if (dispatcher != null && dispatcher.getConsumers().isEmpty()) { deactivateCursor(); + // Remove the cursor from the waiting cursors list. + // For durable cursors, we should *not* cancel the pending read with cursor.cancelPendingReadRequest. + // This is because internally, in the dispatcher implementations, there is a "havePendingRead" flag + // that is not reset. If the pending read is cancelled, the dispatcher will not continue reading from + // the managed ledger when a new consumer is added to the dispatcher since based on the "havePendingRead" + // state, it will continue to expect that a read is pending and will not submit a new read. + // For non-durable cursors, there's no difference since the cursor is not expected to be used again. + + // remove waiting cursor from the managed ledger, this applies to both durable and non-durable cursors. + topic.getManagedLedger().removeWaitingCursor(cursor); if (!cursor.isDurable()) { // If cursor is not durable, we need to clean up the subscription as well. No need to check for active diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 8634d769fea..eefcc0a9f04 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -55,7 +55,6 @@ import lombok.Cleanup; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; import org.apache.bookkeeper.mledger.Entry; -import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException; import org.apache.bookkeeper.mledger.Position; @@ -1027,7 +1026,7 @@ public class ReplicatorTest extends ReplicatorTestBase { * * @throws Exception */ - @Test(timeOut = 15000) + @Test(timeOut = 30000) public void testCloseReplicatorStartProducer() throws Exception { TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/closeCursor")); // Producer on r1 @@ -1044,33 +1043,30 @@ public class ReplicatorTest extends ReplicatorTestBase { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); PersistentReplicator replicator = (PersistentReplicator) topic.getPersistentReplicator("r2"); + // check that the replicator producer is not null + Awaitility.await().untilAsserted(() -> { + assertNotNull(replicator.getProducer()); + }); + // close the cursor - Field cursorField = PersistentReplicator.class.getDeclaredField("cursor"); - cursorField.setAccessible(true); - ManagedCursor cursor = (ManagedCursor) cursorField.get(replicator); - cursor.close(); - // try to read entries + replicator.getCursor().close(); + + // try to produce entries producer1.produce(10); + // attempt to read entries directly from replicator cursor try { - cursor.readEntriesOrWait(10); + replicator.getCursor().readEntriesOrWait(10); fail("It should have failed"); } catch (Exception e) { assertEquals(e.getClass(), CursorAlreadyClosedException.class); } - // replicator-readException: cursorAlreadyClosed - replicator.readEntriesFailed(new CursorAlreadyClosedException("Cursor already closed exception"), null); - // wait replicator producer to be closed - Thread.sleep(100); - - // Replicator producer must be closed - Field producerField = AbstractReplicator.class.getDeclaredField("producer"); - producerField.setAccessible(true); - @SuppressWarnings("unchecked") - ProducerImpl<byte[]> replicatorProducer = (ProducerImpl<byte[]>) producerField.get(replicator); - assertNull(replicatorProducer); + // Replicator producer must be null after the producer has been closed + Awaitility.await().untilAsserted(() -> { + assertNull(replicator.getProducer()); + }); } @Test(timeOut = 30000)