This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c324228eb52 [fix][broker] Fix ManagedCursor state management race 
conditions and lifecycle issues (#24569)
c324228eb52 is described below

commit c324228eb52cd456115fbc580347efc5fc01789f
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Tue Jul 29 14:07:54 2025 +0300

    [fix][broker] Fix ManagedCursor state management race conditions and 
lifecycle issues (#24569)
    
    (cherry picked from commit c96f27a4dde9e4211a5b83e91f5e384d7ac8d904)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 431 ++++++++++++---------
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  37 +-
 .../mledger/impl/NonDurableCursorImpl.java         |   2 +
 .../bookkeeper/mledger/impl/OpReadEntry.java       |  34 +-
 .../mledger/impl/ReadOnlyCursorImpl.java           |   2 +
 .../service/persistent/PersistentSubscription.java |  10 +
 .../pulsar/broker/service/ReplicatorTest.java      |  34 +-
 7 files changed, 342 insertions(+), 208 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index a48fe8a9f9f..170e8f0db1d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -49,7 +49,6 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -156,17 +155,10 @@ public class ManagedCursorImpl implements ManagedCursor {
             MarkDeleteEntry.class, "lastMarkDeleteEntry");
     protected volatile MarkDeleteEntry lastMarkDeleteEntry;
 
-    /** Protects the method "asyncReadEntriesWithSkipOrWait" and 
"cancelPendingReadRequest" runs concurrently. **/
-    private final Object pendingReadOpMutex = new Object();
-    /**
-     *  'ManagedLedger.notifyCursors' relies on this CAS to avoid using 
"pendingReadOpMutex" to guarantee thread-safety,
-     *  which improved the performance of publishing messages.
-     */
     protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, 
OpReadEntry> WAITING_READ_OP_UPDATER =
         AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, 
OpReadEntry.class, "waitingReadOp");
     @SuppressWarnings("unused")
     private volatile OpReadEntry waitingReadOp = null;
-    private DelayCheckForNewEntriesTask delayCheckForNewEntriesTask;
 
     public static final int FALSE = 0;
     public static final int TRUE = 1;
@@ -238,6 +230,11 @@ public class ManagedCursorImpl implements ManagedCursor {
     // active state cache in ManagedCursor. It should be in sync with the 
state in activeCursors in ManagedLedger.
     private volatile boolean isActive = false;
 
+    // This is a lock used to update the registration state of the cursor in 
the managed ledger.
+    private final Object registerToWaitingCursorsLock = new Object();
+    // This is used to track if the cursor is registered in the managed 
ledger's waitingCursors queue
+    boolean registeredToWaitingCursors = false;
+
     class MarkDeleteEntry {
         final PositionImpl newPosition;
         final MarkDeleteCallback callback;
@@ -295,17 +292,38 @@ public class ManagedCursorImpl implements ManagedCursor {
     private volatile long lastActive;
 
     public enum State {
-        Uninitialized, // Cursor is being initialized
-        NoLedger, // There is no metadata ledger open for writing
-        Open, // Metadata ledger is ready
-        SwitchingLedger, // The metadata ledger is being switched
-        Closing, // The managed cursor is closing
-        Closed // The managed cursor has been closed
+        Uninitialized(false), // Cursor is being initialized
+        NoLedger(false), // There is no metadata ledger open for writing
+        Open(false), // Metadata ledger is ready
+        SwitchingLedger(false), // The metadata ledger is being switched
+        Closing(true), // The managed cursor is closing
+        Closed(true), // The managed cursor has been closed
+        Deleting(true), // The managed cursor is being deleted
+        Deleted(true), // The managed cursor has been deleted
+        DeletingFailed(true); // The managed cursor deletion failed, state 
allows retrying deletion.
+
+        // Indicate if the cursor is in a state that is considered closed
+        private final boolean closedState;
+
+        State(boolean closedState) {
+            this.closedState = closedState;
+        }
+
+        /**
+         * Returns true if the state is considered closed.
+         */
+        public boolean isClosed() {
+            return closedState;
+        }
+
+        public boolean isDeletingOrDeleted() {
+            return this == Deleting || this == Deleted;
+        }
     }
 
     protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, 
State> STATE_UPDATER =
         AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, 
State.class, "state");
-    protected volatile State state = null;
+    protected volatile State state = State.Uninitialized;
 
     protected final ManagedCursorMXBean mbean;
 
@@ -329,7 +347,6 @@ public class ManagedCursorImpl implements ManagedCursor {
             this.batchDeletedIndexes = null;
         }
         this.digestType = 
BookKeeper.DigestType.fromApiDigestType(getConfig().getDigestType());
-        STATE_UPDATER.set(this, State.Uninitialized);
         PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.set(this, 0);
         PENDING_READ_OPS_UPDATER.set(this, 0);
         RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE);
@@ -779,7 +796,23 @@ public class ManagedCursorImpl implements ManagedCursor {
         // assign cursor-ledger so, it can be deleted when new ledger will be 
switched
         this.cursorLedger = recoveredFromCursorLedger;
         this.isCursorLedgerReadOnly = true;
-        STATE_UPDATER.set(this, State.NoLedger);
+        changeStateIfNotClosed(State.NoLedger);
+    }
+
+    /**
+     * Change the state of the cursor if it is not already considered closed.
+     * This is to prevent invalid state transitions when the cursor is already 
closed.
+     *
+     * @param newState The new state to set
+     * @return The previous state of the cursor
+     */
+    private State changeStateIfNotClosed(State newState) {
+        return STATE_UPDATER.getAndUpdate(this, current -> {
+            if (current.isClosed()) {
+                return current;
+            }
+            return newState;
+        });
     }
 
     void initialize(PositionImpl position, Map<String, Long> properties, 
Map<String, String> cursorProperties,
@@ -793,7 +826,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 new MetaStoreCallback<>() {
                     @Override
                     public void operationComplete(Void result, Stat stat) {
-                        STATE_UPDATER.set(ManagedCursorImpl.this, 
State.NoLedger);
+                        changeStateIfNotClosed(State.NoLedger);
                         callback.operationComplete();
                     }
                     @Override
@@ -1028,87 +1061,36 @@ public class ManagedCursorImpl implements ManagedCursor 
{
             skipCondition = skipCondition == null ? this::isMessageDeleted : 
skipCondition.or(this::isMessageDeleted);
             OpReadEntry op = OpReadEntry.create(this, readPosition, 
numberOfEntriesToRead, callback,
                     ctx, maxPosition, skipCondition);
-
-            synchronized (pendingReadOpMutex) {
-                if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
-                    op.recycle();
-                    callback.readEntriesFailed(new 
ManagedLedgerException.ConcurrentWaitCallbackException(), ctx);
-                    return;
-                }
-
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] [{}] Deferring retry of read at position 
{}", ledger.getName(), name,
-                            op.readPosition);
-                }
-
-                // Check again for new entries after the configured time, then 
if still no entries are available
-                // register to be notified.
-                if (getConfig().getNewEntriesCheckDelayInMillis() > 0) {
-                    delayCheckForNewEntriesTask = new 
DelayCheckForNewEntriesTask(op, callback, ctx);
-                } else {
-                    // If there's no delay, check directly from the same thread
-                    checkForNewEntries(op, callback, ctx);
-                }
+            int opReadId = op.id;
+            if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
+                op.recycle();
+                callback.readEntriesFailed(new 
ManagedLedgerException.ConcurrentWaitCallbackException(), ctx);
+                return;
             }
-        }
-    }
-
-    private enum DelayCheckForNewEntriesTaskState {
-        INIT, RUNNING, CANCELLED, DONE
-    }
-
-    private class DelayCheckForNewEntriesTask implements Runnable {
-
-        private final OpReadEntry op;
-        private final ReadEntriesCallback callback;
-        private final Object ctx;
-        private final ScheduledFuture<?> scheduledFuture;
-        private DelayCheckForNewEntriesTaskState state = 
DelayCheckForNewEntriesTaskState.INIT;
 
-        public DelayCheckForNewEntriesTask(OpReadEntry op, ReadEntriesCallback 
callback, Object ctx) {
-            this.op = op;
-            this.callback = callback;
-            this.ctx = ctx;
-            scheduledFuture = ledger.getScheduledExecutor().schedule(this,
-                    getConfig().getNewEntriesCheckDelayInMillis(), 
TimeUnit.MILLISECONDS);
-        }
-
-        @Override
-        public void run() {
-            synchronized (pendingReadOpMutex) {
-                if (state != DelayCheckForNewEntriesTaskState.INIT) {
-                    return;
-                }
-                state = DelayCheckForNewEntriesTaskState.RUNNING;
-                checkForNewEntries(op, callback, ctx);
-                state = DelayCheckForNewEntriesTaskState.DONE;
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] [{}] Deferring retry of read at position {}", 
ledger.getName(), name, op.readPosition);
             }
-        }
-
-        public boolean isDone() {
-            return state == DelayCheckForNewEntriesTaskState.DONE;
-        }
 
-        public boolean cancel() {
-            synchronized (pendingReadOpMutex) {
-                // Not all implementations of Executor guarantee that the 
Runnable will be no long be executed after a
-                // successful cancel, such as Guava MoreExecutors, see also 
https://github.com/google/guava/blob
-                // 
/v32.1.2/guava/src/com/google/common/util/concurrent/MoreExecutors.java#L709.
-                // The current task guarantees.
-                if (state != DelayCheckForNewEntriesTaskState.INIT) {
-                    return false;
-                }
-                state = DelayCheckForNewEntriesTaskState.CANCELLED;
-                scheduledFuture.cancel(false);
-                return true;
+            // Check again for new entries after the configured time, then if 
still no entries are available register
+            // to be notified
+            if (getConfig().getNewEntriesCheckDelayInMillis() > 0) {
+                ledger.getScheduledExecutor().schedule(() -> 
checkForNewEntries(opReadId, op, callback, ctx),
+                        getConfig().getNewEntriesCheckDelayInMillis(), 
TimeUnit.MILLISECONDS);
+            } else {
+                // If there's no delay, check directly from the same thread
+                checkForNewEntries(opReadId, op, callback, ctx);
             }
         }
     }
 
-    private void checkForNewEntries(OpReadEntry op, ReadEntriesCallback 
callback, Object ctx) {
+    // Please notice that OpReadEntry might be recycled due to sharing via 
waitingReadOp field logic
+    // That's why the fields cannot be accessed before the reference is 
removed from waitingReadOp atomically
+    // and the id matches the removed reference.
+    private void checkForNewEntries(int opReadId, OpReadEntry op, 
ReadEntriesCallback callback, Object ctx) {
         try {
             if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] Re-trying the read at position {}", 
ledger.getName(), name, op.readPosition);
+                log.debug("[{}] [{}] Re-trying the read for op id {}", 
ledger.getName(), name, opReadId);
             }
 
             if (isClosed()) {
@@ -1137,17 +1119,33 @@ public class ManagedCursorImpl implements ManagedCursor 
{
                     log.debug("[{}] [{}] Found more entries", 
ledger.getName(), name);
                 }
                 // Try to cancel the notification request
-                if (WAITING_READ_OP_UPDATER.compareAndSet(this, op, null)) {
-                    ledger.removeWaitingCursor(this);
+                // Clear the waiting read op only if it matches the current 
instance and the id matches
+                // the opReadId parameter. This avoids recycled OpReadEntry 
instances from matching since their
+                // ids would be different after recycling.
+                OpReadEntry waitingReadOpItem = 
WAITING_READ_OP_UPDATER.getAndUpdate(this,
+                        current -> {
+                            if (current == op && current.id == opReadId) {
+                                // update the value to null to cancel the 
waiting read op
+                                return null;
+                            } else {
+                                // keep the current waiting read op value
+                                return current;
+                            }
+                        });
+                // If the waiting read op was the same as the one we are 
trying to cancel, it means that it was now
+                // cleared from the waitingReadOp field and therefore 
"cancelled"
+                if (waitingReadOpItem == op && waitingReadOpItem.id == 
opReadId) {
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] [{}] Cancelled notification and 
scheduled read at {}", ledger.getName(),
-                            name, op.readPosition);
+                                name, op.readPosition);
                     }
                     PENDING_READ_OPS_UPDATER.incrementAndGet(this);
                     ledger.asyncReadEntries(op);
                 } else {
-                    log.info("[{}] [{}] notification that new entries added 
was already be called, skipped the current"
-                        + " new entry checking", ledger.getName(), name);
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] [{}] notification was already 
cancelled for op id {}", ledger.getName(), name,
+                                opReadId);
+                    }
                 }
             } else if (ledger.isTerminated()) {
                 // At this point we registered for notification and still 
there were no more available
@@ -1162,7 +1160,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     @Override
     public boolean isClosed() {
-        return state == State.Closed || state == State.Closing;
+        return state.isClosed();
     }
 
     @Override
@@ -1170,53 +1168,21 @@ public class ManagedCursorImpl implements ManagedCursor 
{
         if (log.isDebugEnabled()) {
             log.debug("[{}] [{}] Cancel pending read request", 
ledger.getName(), name);
         }
-        synchronized (pendingReadOpMutex) {
-            final OpReadEntry op = WAITING_READ_OP_UPDATER.get(this);
-            // Case 1: the pending read has executed, or there is no pending 
read.
-            if (op == null) {
-                return false;
-            }
-            Function<Boolean, Boolean> clearWaitingReadOp = 
removeWaitingCursor -> {
-                if (WAITING_READ_OP_UPDATER.compareAndSet(this, op, null)) {
-                    if (removeWaitingCursor) {
-                        ledger.removeWaitingCursor(this);
-                    }
-                    op.recycle();
-                    return true;
-                } else {
-                    // Managed ledger has noticed that new entries was added.
-                    if (WAITING_READ_OP_UPDATER.get(this) == null) {
-                        return false;
-                    }
-                    // It will never occur, it means the lock 
"pendingReadOpMutex" does not work as expected, which
-                    // allowed other thread to modify the "waitingReadOp" 
concurrently.
-                    // The waitingReadOp has been modified to other instance, 
which will never occur.
-                    log.warn("[{}] [{}] Cancel pending request encountered an 
unexpected error, the lock"
-                        + " \"pendingReadOpMutex\" does not work as expected, 
which allowed other"
-                        + " thread to modify the \"waitingReadOp\" 
concurrently..", ledger.getName(), name);
-                    return cancelPendingReadRequest();
-                }
-            };
-            // There is a pending read,
-            // Case 2: delayCheckForNewEntriesTask can be cancelled, no need 
to remove cursor from "ml.waitingCursors",
-            //         because it has not added successfully yet.
-            if (delayCheckForNewEntriesTask != null && 
delayCheckForNewEntriesTask.cancel()) {
-                return clearWaitingReadOp.apply(false);
-            }
-            // Case 3: managedLedgerNewEntriesCheckDelayInMillis is "0".
-            // Case 4: delayCheckForNewEntriesTask has done, which has added 
cursor into "ml.waitingCursors".
-            if (delayCheckForNewEntriesTask == null || 
delayCheckForNewEntriesTask.isDone()) {
-                return clearWaitingReadOp.apply(true);
+        final OpReadEntry op = WAITING_READ_OP_UPDATER.getAndUpdate(this, 
current -> {
+            if (current == OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR) {
+                return current;
             }
-            // Case 5: delayCheckForNewEntriesTask is running, but not done, 
which only occurs at a corner case. It
-            // only happens when the task is starting. Calling 
"cancelPendingReadRequest" here will release the lock
-            // "pendingReadOpMutex" and let the task go ahead.
-            return cancelPendingReadRequest();
+            return null;
+        });
+        if (op != null) {
+            op.recycle();
         }
+        return op != null && op != 
OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR;
     }
 
     public boolean hasPendingReadRequest() {
-        return WAITING_READ_OP_UPDATER.get(this) != null;
+        OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER.get(this);
+        return opReadEntry != null && opReadEntry != 
OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR;
     }
 
     @Override
@@ -2234,7 +2200,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         // We cannot write to the ledger during the switch, need to wait until 
the new metadata ledger is available
         synchronized (pendingMarkDeleteOps) {
             // The state might have changed while we were waiting on the queue 
mutex
-            switch (STATE_UPDATER.get(this)) {
+            switch (state) {
             case Closed:
                 callback.markDeleteFailed(new ManagedLedgerException
                         .CursorAlreadyClosedException("Cursor was already 
closed"), ctx);
@@ -2365,7 +2331,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             }
         };
 
-        if (State.NoLedger.equals(STATE_UPDATER.get(this))) {
+        if (state == State.NoLedger) {
             if (ledger.isNoMessagesAfterPos(mdEntry.newPosition)) {
                 log.error("[{}][{}] Metadata ledger creation failed, try to 
persist the position in the metadata"
                         + " store.", ledger.getName(), name);
@@ -2374,7 +2340,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 cb.operationFailed(new ManagedLedgerException("Switch new 
cursor ledger failed"));
             }
         } else {
-            persistPositionToLedger(cursorLedger, mdEntry, cb);
+            persistPositionToLedger(cursorLedger, mdEntry, cb, false);
         }
     }
 
@@ -2819,7 +2785,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                                     ledger.getName(), name, 
cursorLedger.getId(), e.getMessage());
                             callback.closeFailed(e, ctx);
                         }
-                    });
+                    }, true);
         } else {
             persistPositionMetaStore(-1, position, properties, new 
MetaStoreCallback<Void>() {
                 @Override
@@ -2946,12 +2912,18 @@ public class ManagedCursorImpl implements ManagedCursor 
{
             callback.closeComplete(ctx);
             return;
         }
+        closeWaitingCursor();
+        setInactive();
         persistPositionWhenClosing(lastMarkDeleteEntry.newPosition, 
lastMarkDeleteEntry.properties,
                 new AsyncCallbacks.CloseCallback(){
 
                     @Override
                     public void closeComplete(Object ctx) {
-                        STATE_UPDATER.set(ManagedCursorImpl.this, 
State.Closed);
+                        if 
(!STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Closing, 
State.Closed)) {
+                            log.warn("[{}] [{}] State was modified from 
closing to {} while closing", ledger.getName(),
+                                    name, state);
+                            state = State.Closed;
+                        }
                         callback.closeComplete(ctx);
                     }
 
@@ -2964,6 +2936,19 @@ public class ManagedCursorImpl implements ManagedCursor {
                 }, ctx);
     }
 
+    protected void closeWaitingCursor() {
+        synchronized (registerToWaitingCursorsLock) {
+            if (registeredToWaitingCursors) {
+                ledger.removeWaitingCursor(this);
+            }
+        }
+        OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER.getAndSet(this,
+                OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR);
+        if (opReadEntry != null && opReadEntry != 
OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR) {
+            opReadEntry.readEntriesFailed(new 
CursorAlreadyClosedException("Cursor is closing"), opReadEntry.ctx);
+        }
+    }
+
     /**
      * Internal version of seek that doesn't do the validation check.
      *
@@ -3015,8 +3000,8 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     void startCreatingNewMetadataLedger() {
         // Change the state so that new mark-delete ops will be queued and not 
immediately submitted
-        State oldState = STATE_UPDATER.getAndSet(this, State.SwitchingLedger);
-        if (oldState == State.SwitchingLedger) {
+        State oldState = changeStateIfNotClosed(State.SwitchingLedger);
+        if (oldState == State.SwitchingLedger || oldState.isClosed()) {
             // Ignore double request
             return;
         }
@@ -3036,7 +3021,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     flushPendingMarkDeletes();
 
                     // Resume normal mark-delete operations
-                    STATE_UPDATER.set(ManagedCursorImpl.this, State.Open);
+                    changeStateIfNotClosed(State.Open);
                 }
             }
 
@@ -3045,7 +3030,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 log.error("[{}][{}] Metadata ledger creation failed {}", 
ledger.getName(), name, exception);
                 synchronized (pendingMarkDeleteOps) {
                     // At this point we don't have a ledger ready
-                    STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger);
+                    changeStateIfNotClosed(State.NoLedger);
                     // There are two case may cause switch ledger fails.
                     // 1. No enough BKs; BKs are in read-only mode...
                     // 2. Write ZK fails.
@@ -3069,21 +3054,8 @@ public class ManagedCursorImpl implements ManagedCursor {
      * @return false if the {@link #state} already is {@link State#Closing} or 
{@link State#Closed}.
      */
     private boolean trySetStateToClosing() {
-        final AtomicBoolean notClosing = new AtomicBoolean(false);
-        STATE_UPDATER.updateAndGet(this, state -> {
-            switch (state){
-                case Closing:
-                case Closed: {
-                    notClosing.set(false);
-                    return state;
-                }
-                default: {
-                    notClosing.set(true);
-                    return State.Closing;
-                }
-            }
-        });
-        return notClosing.get();
+        State previousState = changeStateIfNotClosed(State.Closing);
+        return !previousState.isClosed();
     }
 
     private void flushPendingMarkDeletes() {
@@ -3126,7 +3098,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     deleteLedgerAsync(newLedgerHandle);
                     callback.operationFailed(exception);
                 }
-            });
+            }, false);
         }).whenComplete((result, e) -> {
             ledger.mbean.endCursorLedgerCreateOp();
             if (e != null) {
@@ -3285,7 +3257,8 @@ public class ManagedCursorImpl implements ManagedCursor {
         }
     }
 
-    void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry 
mdEntry, final VoidCallback callback) {
+    void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry 
mdEntry, final VoidCallback callback,
+                                 boolean ignoreClosedStateAfterFailure) {
         PositionImpl position = mdEntry.newPosition;
         Builder piBuilder = 
PositionInfo.newBuilder().setLedgerId(position.getLedgerId())
                 .setEntryId(position.getEntryId())
@@ -3344,7 +3317,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 mbean.addWriteCursorLedgerSize(data.length);
                 callback.operationComplete();
             } else {
-                if (state == State.Closed) {
+                if (!ignoreClosedStateAfterFailure && state.isClosed()) {
                     // After closed the cursor, the in-progress persistence 
task will get a
                     // BKException.Code.LedgerClosedException.
                     callback.operationFailed(new 
CursorAlreadyClosedException(String.format("%s %s skipped this"
@@ -3395,7 +3368,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         if (ledger.getFactory().isMetadataServiceAvailable()
                 && (lh.getLastAddConfirmed() >= 
getConfig().getMetadataMaxEntriesPerLedger()
                 || lastLedgerSwitchTimestamp < (now - 
getConfig().getLedgerRolloverTimeout() * 1000))
-                && (STATE_UPDATER.get(this) != State.Closed && 
STATE_UPDATER.get(this) != State.Closing)) {
+                && !state.isClosed()) {
             // It's safe to modify the timestamp since this method will be 
only called from a callback, implying that
             // calls will be serialized on one single thread
             lastLedgerSwitchTimestamp = now;
@@ -3443,7 +3416,21 @@ public class ManagedCursorImpl implements ManagedCursor {
         if (log.isDebugEnabled()) {
             log.debug("[{}] [{}] Received ml notification", ledger.getName(), 
name);
         }
-        OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER.getAndSet(this, 
null);
+
+        OpReadEntry opReadEntry = WAITING_READ_OP_UPDATER.getAndUpdate(this, 
current -> {
+            // if the waitingReadOp is WAITING_READ_OP_FOR_CLOSED_CURSOR, keep 
it as is
+            if (current == OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR) {
+                return current;
+            } else {
+                // Otherwise, clear the waiting read operation
+                return null;
+            }
+        });
+
+        // ignore the notification if the cursor is already closed
+        if (opReadEntry == OpReadEntry.WAITING_READ_OP_FOR_CLOSED_CURSOR) {
+            return;
+        }
 
         if (opReadEntry != null) {
             if (log.isDebugEnabled()) {
@@ -3452,7 +3439,15 @@ public class ManagedCursorImpl implements ManagedCursor {
                 log.debug("[{}] Consumer {} cursor notification: other 
counters: consumed {} mdPos {} rdPos {}",
                         ledger.getName(), name, messagesConsumedCounter, 
markDeletePosition, readPosition);
             }
-
+            if (isClosed()) {
+                // If the cursor is closed, we should not read any more entries
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] [{}] Cursor is already closed, ignoring 
notification", ledger.getName(), name);
+                }
+                opReadEntry.readEntriesFailed(new 
ManagedLedgerException.CursorAlreadyClosedException(
+                        "Cursor was already closed"), opReadEntry.ctx);
+                return;
+            }
             PENDING_READ_OPS_UPDATER.incrementAndGet(this);
             opReadEntry.readPosition = (PositionImpl) getReadPosition();
             ledger.asyncReadEntries(opReadEntry);
@@ -3487,7 +3482,6 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     void decrementPendingMarkDeleteCount() {
         if (PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.decrementAndGet(this) 
== 0) {
-            final State state = STATE_UPDATER.get(this);
             if (state == State.SwitchingLedger) {
                 // A metadata ledger switch was pending and now we can do it 
since we don't have any more
                 // outstanding mark-delete requests
@@ -3499,7 +3493,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     void readOperationCompleted() {
         if (PENDING_READ_OPS_UPDATER.decrementAndGet(this) == 0) {
             synchronized (pendingMarkDeleteOps) {
-                if (STATE_UPDATER.get(this) == State.Open) {
+                if (state == State.Open) {
                     // Flush the pending writes only if the state is open.
                     flushPendingMarkDeletes();
                 } else if 
(PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.get(this) != 0) {
@@ -3546,13 +3540,25 @@ public class ManagedCursorImpl implements ManagedCursor 
{
     }
 
     private void asyncDeleteCursorLedger(int retry) {
-        STATE_UPDATER.set(this, State.Closed);
+        State beforeChangingState = changeStateToDeletingIfNotDeleted();
+        if (beforeChangingState == State.Deleted) {
+            log.warn("[{}-{}] Cursor ledger is already deleted. state={}", 
ledger.getName(), name,
+                    beforeChangingState);
+            return;
+        }
 
-        if (cursorLedger == null || retry <= 0) {
-            if (cursorLedger != null) {
-                log.warn("[{}-{}] Failed to delete ledger after retries {}", 
ledger.getName(), name,
-                        cursorLedger.getId());
-            }
+        closeWaitingCursor();
+
+        if (cursorLedger == null) {
+            log.warn("[{}-{}] There's no cursor ledger available for 
deletion.", ledger.getName(), name);
+            state = State.DeletingFailed;
+            return;
+        }
+
+        if (retry <= 0) {
+            log.warn("[{}-{}] Failed to delete ledger after retries {}", 
ledger.getName(), name,
+                    cursorLedger.getId());
+            state = State.DeletingFailed;
             return;
         }
 
@@ -3560,18 +3566,36 @@ public class ManagedCursorImpl implements ManagedCursor 
{
         bookkeeper.asyncDeleteLedger(cursorLedger.getId(), (rc, ctx) -> {
             ledger.mbean.endCursorLedgerDeleteOp();
             if (rc == BKException.Code.OK) {
+                state = State.Deleted;
                 log.info("[{}][{}] Deleted cursor ledger {}", 
ledger.getName(), name, cursorLedger.getId());
             } else {
                 log.warn("[{}][{}] Failed to delete ledger {}: {}", 
ledger.getName(), name, cursorLedger.getId(),
                         BKException.getMessage(rc));
                 if (!isNoSuchLedgerExistsException(rc)) {
+                    state = State.DeletingFailed;
                     ledger.getScheduledExecutor().schedule(() -> 
asyncDeleteCursorLedger(retry - 1),
                             DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, 
TimeUnit.SECONDS);
+                } else {
+                    state = State.Deleted;
                 }
             }
         }, null);
     }
 
+    /**
+     * Change the state to {@link State#Deleting} if the current state is not 
{@link State#Deleted}.
+     * @return The state before changing.
+     */
+    State changeStateToDeletingIfNotDeleted() {
+        return STATE_UPDATER.getAndUpdate(this, current -> {
+            // don't change the state if it's already deleted
+            if (current == State.Deleted) {
+                return current;
+            }
+            return State.Deleting;
+        });
+    }
+
     /**
      * return BK error codes that are considered not likely to be recoverable.
      */
@@ -3718,7 +3742,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     }
 
     public String getState() {
-        return STATE_UPDATER.get(this).toString();
+        return state.toString();
     }
 
     @Override
@@ -3840,8 +3864,8 @@ public class ManagedCursorImpl implements ManagedCursor {
     }
 
     @VisibleForTesting
-    public void setState(State state) {
-        this.state = state;
+    public State getAndSetState(State state) {
+        return STATE_UPDATER.getAndSet(this, state);
     }
 
     public void setCacheReadEntry(boolean cacheReadEntry) {
@@ -3887,4 +3911,55 @@ public class ManagedCursorImpl implements ManagedCursor {
         }
         return newNonDurableCursor;
     }
+
+    /**
+     * Called by ManagedLedgerImpl to execute the Runnable inside the lock to 
remove the cursor from it's
+     * waiting cursors list.
+     * The cursor state is set to unregistered, and it can be registered again 
for waiting in ManagedLedgerImpl.
+     */
+    void removeWaitingCursorRequested(Runnable removeWaitingCursorRunnable) {
+        synchronized (registerToWaitingCursorsLock) {
+            if (!registeredToWaitingCursors) {
+                // The cursor hasn't been registered, do not attempt to remove
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Skipping removing cursor {} from waiting 
cursors since it's not registered.",
+                            ledger.getName(), name);
+                }
+                return;
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Removing cursor {} from waiting cursors", 
ledger.getName(), name);
+            }
+            removeWaitingCursorRunnable.run();
+            registeredToWaitingCursors = false;
+        }
+    }
+
+    /**
+     * Called by ManagedLedgerImpl to notify that the cursor has been dequeued 
from the waiting cursors list.
+     */
+    void notifyWaitingCursorDequeued() {
+        synchronized (registerToWaitingCursorsLock) {
+            registeredToWaitingCursors = false;
+        }
+    }
+
+    /**
+     * Called by ManagedLedgerImpl to execute the Runnable inside the lock to 
remove the cursor from it's
+     * waiting cursors list.
+     * This method is used to ensure that the cursor is not already 
registered, resulting in duplicates.
+     */
+    void addWaitingCursorRequested(Runnable addWaitingCursorRunnable) {
+        synchronized (registerToWaitingCursorsLock) {
+            if (registeredToWaitingCursors || isClosed()) {
+                // The cursor is already registered or closed, do not register 
again.
+                return;
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Adding cursor {} to waiting cursors", 
ledger.getName(), name);
+            }
+            addWaitingCursorRunnable.run();
+            registeredToWaitingCursors = true;
+        }
+    }
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 43c224e7875..869f60cad31 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1054,15 +1054,29 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
             callback.deleteCursorFailed(new 
ManagedLedgerException.CursorNotFoundException("ManagedCursor not found: "
                     + consumerName), ctx);
             return;
-        } else if (!cursor.isDurable()) {
-            cursor.setState(ManagedCursorImpl.State.Closed);
-            cursor.cancelPendingReadRequest();
+        }
+
+        // Non-durable cursors can be closed and removed immediately
+        if (!cursor.isDurable()) {
+            try {
+                cursor.close();
+            } catch (Exception e) {
+                log.warn("[{}] Failed to close non-durable cursor {}", name, 
consumerName, e);
+            }
             cursors.removeCursor(consumerName);
-            deactivateCursorByName(consumerName);
             callback.deleteCursorComplete(ctx);
             return;
         }
 
+        // If the cursor is active, we need to deactivate it first
+        cursor.setInactive();
+        // Set the state to deleting (which is a closed state) to avoid any 
new writes
+        ManagedCursorImpl.State beforeChangingState = 
cursor.changeStateToDeletingIfNotDeleted();
+        if (beforeChangingState.isDeletingOrDeleted()) {
+            log.warn("[{}] [{}] Cursor is already being deleted or has been 
deleted.", name, consumerName);
+            return;
+        }
+
         // First remove the consumer form the MetaStore. If this operation 
succeeds and the next one (removing the
         // ledger from BK) don't, we end up having a loose ledger leaked but 
the state will be consistent.
         store.asyncRemoveCursor(ManagedLedgerImpl.this.name, consumerName, new 
MetaStoreCallback<Void>() {
@@ -1070,7 +1084,6 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             public void operationComplete(Void result, Stat stat) {
                 cursor.asyncDeleteCursorLedger();
                 cursors.removeCursor(consumerName);
-                deactivateCursorByName(consumerName);
 
                 trimConsumedLedgersInBackground();
 
@@ -1080,7 +1093,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
             @Override
             public void operationFailed(MetaStoreException e) {
-                handleBadVersion(e);
+                cursor.getAndSetState(ManagedCursorImpl.State.DeletingFailed);
                 callback.deleteCursorFailed(e, ctx);
             }
 
@@ -2442,7 +2455,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             if (waitingCursor == null) {
                 break;
             }
-
+            waitingCursor.notifyWaitingCursorDequeued();
             executor.execute(waitingCursor::notifyEntriesAvailable);
         }
     }
@@ -3930,11 +3943,17 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
 
 
     public void removeWaitingCursor(ManagedCursor cursor) {
-        this.waitingCursors.remove(cursor);
+        ((ManagedCursorImpl) cursor).removeWaitingCursorRequested(() -> {
+            // remove only if the cursor has been registered
+            this.waitingCursors.remove(cursor);
+        });
     }
 
     public void addWaitingCursor(ManagedCursorImpl cursor) {
-        this.waitingCursors.add(cursor);
+        cursor.addWaitingCursorRequested(() -> {
+            // add only if the cursor has not been registered
+            this.waitingCursors.add(cursor);
+        });
     }
 
     public boolean isCursorActive(ManagedCursor cursor) {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 734eab20bc5..7dbf5400da1 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -111,6 +111,8 @@ public class NonDurableCursorImpl extends ManagedCursorImpl 
{
     @Override
     public void asyncClose(CloseCallback callback, Object ctx) {
         STATE_UPDATER.set(this, State.Closed);
+        closeWaitingCursor();
+        setInactive();
         callback.closeComplete(ctx);
     }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index a250d259553..20c7742129b 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -22,6 +22,7 @@ import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -33,7 +34,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class OpReadEntry implements ReadEntriesCallback {
-
+    static final OpReadEntry WAITING_READ_OP_FOR_CLOSED_CURSOR = new 
OpReadEntry();
+    private static final AtomicInteger opReadIdGenerator = new 
AtomicInteger(1);
+    /**
+     * id for this read operation. Value can be negative when integer value 
overflow happens.
+     * Used for waitingReadOp consistency so the the correct instance is 
handled after the instance has already been
+     * recycled.
+     */
+    int id;
     ManagedCursorImpl cursor;
     PositionImpl readPosition;
     private int count;
@@ -50,6 +58,7 @@ class OpReadEntry implements ReadEntriesCallback {
     public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl 
readPositionRef, int count,
             ReadEntriesCallback callback, Object ctx, PositionImpl 
maxPosition, Predicate<PositionImpl> skipCondition) {
         OpReadEntry op = RECYCLER.get();
+        op.id = opReadIdGenerator.getAndIncrement();
         op.readPosition = 
cursor.ledger.startReadOperationOnLedger(readPositionRef);
         op.cursor = cursor;
         op.count = count;
@@ -122,7 +131,7 @@ class OpReadEntry implements ReadEntriesCallback {
         if (!entries.isEmpty()) {
             // There were already some entries that were read before, we can 
return them
             complete(ctx);
-        } else if (cursor.getConfig().isAutoSkipNonRecoverableData()
+        } else if (!cursor.isClosed() && 
cursor.getConfig().isAutoSkipNonRecoverableData()
                 && exception instanceof NonRecoverableLedgerException) {
             log.warn("[{}][{}] read failed from ledger at position:{} : {}", 
cursor.ledger.getName(), cursor.getName(),
                     readPosition, exception.getMessage());
@@ -197,6 +206,22 @@ class OpReadEntry implements ReadEntriesCallback {
         this.recyclerHandle = recyclerHandle;
     }
 
+    // no-op constructor for EMPTY instance
+    private OpReadEntry() {
+        this.recyclerHandle = null;
+        this.callback = new ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                // no-op
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
+                // no-op
+            }
+        };
+    }
+
     private static final Recycler<OpReadEntry> RECYCLER = new 
Recycler<OpReadEntry>() {
         @Override
         protected OpReadEntry newObject(Recycler.Handle<OpReadEntry> 
recyclerHandle) {
@@ -205,6 +230,11 @@ class OpReadEntry implements ReadEntriesCallback {
     };
 
     public void recycle() {
+        if (recyclerHandle == null) {
+            // This is the no-op instance, do not recycle
+            return;
+        }
+        id = -1;
         count = 0;
         cursor = null;
         readPosition = null;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
index 2461bcf780e..0ad00e91d10 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java
@@ -59,6 +59,8 @@ public class ReadOnlyCursorImpl extends ManagedCursorImpl 
implements ReadOnlyCur
     @Override
     public void asyncClose(final AsyncCallbacks.CloseCallback callback, final 
Object ctx) {
         state = State.Closed;
+        closeWaitingCursor();
+        setInactive();
         callback.closeComplete(ctx);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 2a27090a990..24534f0aaa7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -339,6 +339,16 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
 
         if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
             deactivateCursor();
+            // Remove the cursor from the waiting cursors list.
+            // For durable cursors, we should *not* cancel the pending read 
with cursor.cancelPendingReadRequest.
+            // This is because internally, in the dispatcher implementations, 
there is a "havePendingRead" flag
+            // that is not reset. If the pending read is cancelled, the 
dispatcher will not continue reading from
+            // the managed ledger when a new consumer is added to the 
dispatcher since based on the "havePendingRead"
+            // state, it will continue to expect that a read is pending and 
will not submit a new read.
+            // For non-durable cursors, there's no difference since the cursor 
is not expected to be used again.
+
+            // remove waiting cursor from the managed ledger, this applies to 
both durable and non-durable cursors.
+            topic.getManagedLedger().removeWaitingCursor(cursor);
 
             if (!cursor.isDurable()) {
                 // If cursor is not durable, we need to clean up the 
subscription as well
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 75c418953e3..5820ef3d8e4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -55,7 +55,6 @@ import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
 import org.apache.bookkeeper.mledger.Position;
@@ -997,7 +996,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
      *
      * @throws Exception
      */
-    @Test(timeOut = 15000)
+    @Test(timeOut = 30000)
     public void testCloseReplicatorStartProducer() throws Exception {
         TopicName dest = 
TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/closeCursor"));
         // Producer on r1
@@ -1014,33 +1013,30 @@ public class ReplicatorTest extends ReplicatorTestBase {
         PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
         PersistentReplicator replicator = (PersistentReplicator) 
topic.getPersistentReplicator("r2");
 
+        // check that the replicator producer is not null
+        Awaitility.await().untilAsserted(() -> {
+            assertNotNull(replicator.getProducer());
+        });
+
         // close the cursor
-        Field cursorField = 
PersistentReplicator.class.getDeclaredField("cursor");
-        cursorField.setAccessible(true);
-        ManagedCursor cursor = (ManagedCursor) cursorField.get(replicator);
-        cursor.close();
-        // try to read entries
+        replicator.getCursor().close();
+
+        // try to produce entries
         producer1.produce(10);
 
+        // attempt to read entries directly from replicator cursor
         try {
-            cursor.readEntriesOrWait(10);
+            replicator.getCursor().readEntriesOrWait(10);
             fail("It should have failed");
         } catch (Exception e) {
             assertEquals(e.getClass(), CursorAlreadyClosedException.class);
         }
 
-        // replicator-readException: cursorAlreadyClosed
-        replicator.readEntriesFailed(new CursorAlreadyClosedException("Cursor 
already closed exception"), null);
-
         // wait replicator producer to be closed
-        Thread.sleep(100);
-
-        // Replicator producer must be closed
-        Field producerField = 
AbstractReplicator.class.getDeclaredField("producer");
-        producerField.setAccessible(true);
-        @SuppressWarnings("unchecked")
-        ProducerImpl<byte[]> replicatorProducer = (ProducerImpl<byte[]>) 
producerField.get(replicator);
-        assertNull(replicatorProducer);
+        // Replicator producer must be null after the producer has been closed
+        Awaitility.await().untilAsserted(() -> {
+            assertNull(replicator.getProducer());
+        });
     }
 
     @Test(timeOut = 30000)

Reply via email to