This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7a614c088d00b9dfbfe8a59fb727508b436be3bb
Author: fengyubiao <[email protected]>
AuthorDate: Tue Apr 23 09:23:08 2024 +0800

    [fix] [broker] Part-1: Replicator can not created successfully due to an 
orphan replicator in the previous topic owner (#21946)
    
    (cherry picked from commit 49240522f543eea0e9307811c92b487eabe431d9)
---
 .../pulsar/broker/service/AbstractReplicator.java  | 332 ++++++++++++++++-----
 .../pulsar/broker/service/BrokerService.java       |   2 +-
 .../apache/pulsar/broker/service/Replicator.java   |   4 +-
 .../nonpersistent/NonPersistentReplicator.java     |   5 +-
 .../service/nonpersistent/NonPersistentTopic.java  |  10 +-
 .../service/persistent/PersistentReplicator.java   |  87 +++---
 .../broker/service/persistent/PersistentTopic.java |  31 +-
 .../broker/service/AbstractReplicatorTest.java     |  22 +-
 .../broker/service/OneWayReplicatorTest.java       | 276 ++++++++++++++++-
 .../broker/service/OneWayReplicatorTestBase.java   |  40 ++-
 .../pulsar/broker/service/PersistentTopicTest.java |   6 +-
 .../pulsar/broker/service/ReplicatorTest.java      |  11 +-
 12 files changed, 656 insertions(+), 170 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 1b5b2824257..f34144deb0a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -18,16 +18,22 @@
  */
 package org.apache.pulsar.broker.service;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import lombok.Getter;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.Backoff;
@@ -39,7 +45,7 @@ import org.apache.pulsar.common.util.StringInterner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractReplicator {
+public abstract class AbstractReplicator implements Replicator {
 
     protected final BrokerService brokerService;
     protected final String localTopicName;
@@ -64,10 +70,31 @@ public abstract class AbstractReplicator {
 
     protected static final AtomicReferenceFieldUpdater<AbstractReplicator, 
State> STATE_UPDATER =
             AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, 
State.class, "state");
-    private volatile State state = State.Stopped;
-
-    protected enum State {
-        Stopped, Starting, Started, Stopping
+    @VisibleForTesting
+    @Getter
+    protected volatile State state = State.Disconnected;
+
+    public enum State {
+        /**
+         * This enum has two mean meanings:
+         *   Init: replicator is just created, has not been started now.
+         *   Disconnected: the producer was closed after {@link 
PersistentTopic#checkGC} called {@link #disconnect}.
+         */
+        // The internal producer is disconnected.
+        Disconnected,
+        // Trying to create a new internal producer.
+        Starting,
+        // The internal producer has started, and tries copy data.
+        Started,
+        /**
+         * The producer is closing after {@link PersistentTopic#checkGC} 
called {@link #disconnect}.
+         */
+        // The internal producer is trying to disconnect.
+        Disconnecting,
+        // The replicator is in terminating.
+        Terminating,
+        // The replicator is never used again. Pulsar will create a new 
Replicator when enable replication again.
+        Terminated;
     }
 
     public AbstractReplicator(String localCluster, Topic localTopic, String 
remoteCluster, String remoteTopicName,
@@ -96,16 +123,16 @@ public abstract class AbstractReplicator {
                 .sendTimeout(0, TimeUnit.SECONDS) //
                 .maxPendingMessages(producerQueueSize) //
                 .producerName(getProducerName());
-        STATE_UPDATER.set(this, State.Stopped);
+        STATE_UPDATER.set(this, State.Disconnected);
     }
 
     protected abstract String getProducerName();
 
-    protected abstract void 
readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer);
+    protected abstract void 
setProducerAndTriggerReadEntries(org.apache.pulsar.client.api.Producer<byte[]> 
producer);
 
     protected abstract Position getReplicatorReadPosition();
 
-    protected abstract long getNumberOfEntriesInBacklog();
+    public abstract long getNumberOfEntriesInBacklog();
 
     protected abstract void disableReplicatorRead();
 
@@ -113,66 +140,121 @@ public abstract class AbstractReplicator {
         return remoteCluster;
     }
 
-    // This method needs to be synchronized with disconnects else if there is 
a disconnect followed by startProducer
-    // the end result can be disconnect.
-    public synchronized void startProducer() {
-        if (STATE_UPDATER.get(this) == State.Stopping) {
-            long waitTimeMs = backOff.next();
-            if (log.isDebugEnabled()) {
-                log.debug(
-                        "[{}] waiting for producer to close before attempting 
to reconnect, retrying in {} s",
-                        replicatorId, waitTimeMs / 1000.0);
-            }
-            // BackOff before retrying
-            
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, 
waitTimeMs,
-                    TimeUnit.MILLISECONDS);
-            return;
-        }
-        State state = STATE_UPDATER.get(this);
-        if (!STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting)) 
{
-            if (state == State.Started) {
-                // Already running
+    public void startProducer() {
+        // Guarantee only one task call "producerBuilder.createAsync()".
+        Pair<Boolean, State> setStartingRes = 
compareSetAndGetState(State.Disconnected, State.Starting);
+        if (!setStartingRes.getLeft()) {
+            if (setStartingRes.getRight() == State.Starting) {
+                log.info("[{}] Skip the producer creation since other thread 
is doing starting, state : {}",
+                        replicatorId, state);
+            } else if (setStartingRes.getRight() == State.Started) {
+                // Since the method "startProducer" will be called even if it 
is started, only print debug-level log.
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Replicator was already running. state: 
{}", replicatorId, state);
+                }
+            } else if (setStartingRes.getRight() == State.Disconnecting) {
                 if (log.isDebugEnabled()) {
-                    log.debug("[{}] Replicator was already running", 
replicatorId);
+                    log.debug("[{}] Rep.producer is closing, delay to 
retry(wait the producer close success)."
+                            + " state: {}", replicatorId, state);
                 }
+                delayStartProducerAfterDisconnected();
             } else {
-                log.info("[{}] Replicator already being started. Replicator 
state: {}", replicatorId, state);
+                /** {@link State.Terminating}, {@link State.Terminated}. **/
+                log.info("[{}] Skip the producer creation since the replicator 
state is : {}", replicatorId, state);
             }
-
             return;
         }
 
         log.info("[{}] Starting replicator", replicatorId);
         producerBuilder.createAsync().thenAccept(producer -> {
-            readEntries(producer);
+            setProducerAndTriggerReadEntries(producer);
         }).exceptionally(ex -> {
-            if (STATE_UPDATER.compareAndSet(this, State.Starting, 
State.Stopped)) {
+            Pair<Boolean, State> setDisconnectedRes = 
compareSetAndGetState(State.Starting, State.Disconnected);
+            if (setDisconnectedRes.getLeft()) {
                 long waitTimeMs = backOff.next();
                 log.warn("[{}] Failed to create remote producer ({}), retrying 
in {} s",
                         replicatorId, ex.getMessage(), waitTimeMs / 1000.0);
-
                 // BackOff before retrying
-                
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, 
waitTimeMs,
-                        TimeUnit.MILLISECONDS);
+                scheduleCheckTopicActiveAndStartProducer(waitTimeMs);
             } else {
-                log.warn("[{}] Failed to create remote producer. Replicator 
state: {}", replicatorId,
-                        STATE_UPDATER.get(this), ex);
+                if (setDisconnectedRes.getRight() == State.Terminating
+                        || setDisconnectedRes.getRight() == State.Terminated) {
+                    log.info("[{}] Skip to create producer, because it has 
been terminated, state is : {}",
+                            replicatorId, state);
+                } else {
+                    /** {@link  State.Disconnected}, {@link  State.Starting}, 
{@link  State.Started} **/
+                    // Since only one task can call 
"producerBuilder.createAsync()", this scenario is not expected.
+                    // So print a warn log.
+                    log.warn("[{}] Other thread will try to create the 
producer again. so skipped current one task."
+                                    + " State is : {}",
+                            replicatorId, state);
+                }
             }
             return null;
         });
+    }
 
+    /***
+     * The producer is disconnecting, delay to start the producer.
+     * If we start a producer immediately, we will get a conflict 
producer(same name producer) registered error.
+     */
+    protected void delayStartProducerAfterDisconnected() {
+        long waitTimeMs = backOff.next();
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "[{}] waiting for producer to close before attempting to 
reconnect, retrying in {} s",
+                    replicatorId, waitTimeMs / 1000.0);
+        }
+        scheduleCheckTopicActiveAndStartProducer(waitTimeMs);
     }
 
-    protected void checkTopicActiveAndRetryStartProducer() {
-        isLocalTopicActive().thenAccept(isTopicActive -> {
-            if (isTopicActive) {
-                startProducer();
+    protected void scheduleCheckTopicActiveAndStartProducer(final long 
waitTimeMs) {
+        brokerService.executor().schedule(() -> {
+            if (state == State.Terminating || state == State.Terminated) {
+                log.info("[{}] Skip scheduled to start the producer since the 
replicator state is : {}",
+                        replicatorId, state);
+                return;
             }
-        }).exceptionally(ex -> {
-            log.warn("[{}] Stop retry to create producer due to topic load 
fail. Replicator state: {}", replicatorId,
-                    STATE_UPDATER.get(this), ex);
-            return null;
-        });
+            CompletableFuture<Optional<Topic>> topicFuture = 
brokerService.getTopics().get(localTopicName);
+            if (topicFuture == null) {
+                // Topic closed.
+                log.info("[{}] Skip scheduled to start the producer since the 
topic was closed successfully."
+                        + " And trigger a terminate.", replicatorId);
+                terminate();
+                return;
+            }
+            topicFuture.thenAccept(optional -> {
+                if (optional.isEmpty()) {
+                    // Topic closed.
+                    log.info("[{}] Skip scheduled to start the producer since 
the topic was closed. And trigger a"
+                            + " terminate.", replicatorId);
+                    terminate();
+                    return;
+                }
+                if (optional.get() != localTopic) {
+                    // Topic closed and created a new one, current replicator 
is outdated.
+                    log.info("[{}] Skip scheduled to start the producer since 
the topic was closed. And trigger a"
+                            + " terminate.", replicatorId);
+                    terminate();
+                    return;
+                }
+                Replicator replicator = 
localTopic.getReplicators().get(remoteCluster);
+                if (replicator != AbstractReplicator.this) {
+                    // Current replicator has been closed, and created a new 
one.
+                    log.info("[{}] Skip scheduled to start the producer since 
a new replicator has instead current"
+                            + " one. And trigger a terminate.", replicatorId);
+                    terminate();
+                    return;
+                }
+                startProducer();
+            }).exceptionally(ex -> {
+                log.warn("[{}] [{}] Stop retry to create producer due to 
unknown error(topic create failed), and"
+                                + " trigger a terminate. Replicator state: {}",
+                        localTopicName, replicatorId, STATE_UPDATER.get(this), 
ex);
+                terminate();
+                return null;
+            });
+        }, waitTimeMs, TimeUnit.MILLISECONDS);
     }
 
     protected CompletableFuture<Boolean> isLocalTopicActive() {
@@ -188,58 +270,130 @@ public abstract class AbstractReplicator {
         }, brokerService.executor());
     }
 
-    protected synchronized CompletableFuture<Void> closeProducerAsync() {
-        if (producer == null) {
-            STATE_UPDATER.set(this, State.Stopped);
+    /**
+     * This method only be used by {@link PersistentTopic#checkGC} now.
+     */
+    public CompletableFuture<Void> disconnect(boolean failIfHasBacklog, 
boolean closeTheStartingProducer) {
+        long backlog = getNumberOfEntriesInBacklog();
+        if (failIfHasBacklog && backlog > 0) {
+            CompletableFuture<Void> disconnectFuture = new 
CompletableFuture<>();
+            disconnectFuture.completeExceptionally(new 
TopicBusyException("Cannot close a replicator with backlog"));
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Replicator disconnect failed since topic has 
backlog", replicatorId);
+            }
+            return disconnectFuture;
+        }
+        log.info("[{}] Disconnect replicator at position {} with backlog {}", 
replicatorId,
+                getReplicatorReadPosition(), backlog);
+        return closeProducerAsync(closeTheStartingProducer);
+    }
+
+    /**
+     * This method only be used by {@link PersistentTopic#checkGC} now.
+     */
+    protected CompletableFuture<Void> closeProducerAsync(boolean 
closeTheStartingProducer) {
+        Pair<Boolean, State> setDisconnectingRes = 
compareSetAndGetState(State.Started, State.Disconnecting);
+        if (!setDisconnectingRes.getLeft()) {
+            if (setDisconnectingRes.getRight() == State.Starting) {
+                if (closeTheStartingProducer) {
+                    /**
+                     * Delay retry(wait for the start producer task is finish).
+                     * Note: If the producer always start fail, the start 
producer task will always retry until the
+                     *   state changed to {@link State.Terminated}.
+                     *   Nit: The better solution is creating a {@link 
CompletableFuture} to trace the in-progress
+                     *     creation and call 
"inProgressCreationFuture.thenApply(closeProducer())".
+                     */
+                    long waitTimeMs = backOff.next();
+                    brokerService.executor().schedule(() -> 
closeProducerAsync(true),
+                            waitTimeMs, TimeUnit.MILLISECONDS);
+                } else {
+                    log.info("[{}] Skip current producer closing since the 
previous producer has been closed,"
+                                    + " and trying start a new one, state : 
{}",
+                            replicatorId, setDisconnectingRes.getRight());
+                }
+            } else if (setDisconnectingRes.getRight() == State.Disconnected
+                    || setDisconnectingRes.getRight() == State.Disconnecting) {
+                log.info("[{}] Skip current producer closing since other 
thread did closing, state : {}",
+                        replicatorId, setDisconnectingRes.getRight());
+            } else if (setDisconnectingRes.getRight() == State.Terminating
+                    || setDisconnectingRes.getRight() == State.Terminated) {
+                log.info("[{}] Skip current producer closing since other 
thread is doing termination, state : {}",
+                        replicatorId, state);
+            }
+            log.info("[{}] Skip current termination since other thread is 
doing close producer or termination,"
+                            + " state : {}", replicatorId, state);
             return CompletableFuture.completedFuture(null);
         }
-        CompletableFuture<Void> future = producer.closeAsync();
+
+        // Close producer and update state.
+        return doCloseProducerAsync(producer, () -> {
+            Pair<Boolean, State> setDisconnectedRes = 
compareSetAndGetState(State.Disconnecting, State.Disconnected);
+            if (setDisconnectedRes.getLeft()) {
+                this.producer = null;
+                // deactivate further read
+                disableReplicatorRead();
+                return;
+            }
+            if (setDisconnectedRes.getRight() == State.Terminating
+                    || setDisconnectingRes.getRight() == State.Terminated) {
+                log.info("[{}] Skip setting state to terminated because it was 
terminated, state : {}",
+                        replicatorId, state);
+            } else {
+                // Since only one task can call 
"doCloseProducerAsync(producer, action)", this scenario is not expected.
+                // So print a warn log.
+                log.warn("[{}] Other task has change the state to terminated. 
so skipped current one task."
+                                + " State is : {}",
+                        replicatorId, state);
+            }
+        });
+    }
+
+    protected CompletableFuture<Void> doCloseProducerAsync(Producer<byte[]> 
producer, Runnable actionAfterClosed) {
+        CompletableFuture<Void> future =
+                producer == null ? CompletableFuture.completedFuture(null) : 
producer.closeAsync();
         return future.thenRun(() -> {
-            STATE_UPDATER.set(this, State.Stopped);
-            this.producer = null;
-            // deactivate further read
-            disableReplicatorRead();
+            actionAfterClosed.run();
         }).exceptionally(ex -> {
             long waitTimeMs = backOff.next();
             log.warn(
-                    "[{}] Exception: '{}' occurred while trying to close the 
producer."
-                            + " retrying again in {} s",
-                    replicatorId, ex.getMessage(), waitTimeMs / 1000.0);
+                    "[{}] Exception: '{}' occurred while trying to close the 
producer. Replicator state: {}."
+                            + " Retrying again in {} s.",
+                    replicatorId, ex.getMessage(), state, waitTimeMs / 1000.0);
             // BackOff before retrying
-            brokerService.executor().schedule(this::closeProducerAsync, 
waitTimeMs, TimeUnit.MILLISECONDS);
+            brokerService.executor().schedule(() -> 
doCloseProducerAsync(producer, actionAfterClosed),
+                    waitTimeMs, TimeUnit.MILLISECONDS);
             return null;
         });
     }
 
-
-    public CompletableFuture<Void> disconnect() {
-        return disconnect(false);
+    public CompletableFuture<Void> terminate() {
+        if (!tryChangeStatusToTerminating()) {
+            log.info("[{}] Skip current termination since other thread is 
doing termination, state : {}", replicatorId,
+                    state);
+            return CompletableFuture.completedFuture(null);
+        }
+        return doCloseProducerAsync(producer, () -> {
+            STATE_UPDATER.set(this, State.Terminated);
+            this.producer = null;
+            // set the cursor as inactive.
+            disableReplicatorRead();
+        });
     }
 
-    public synchronized CompletableFuture<Void> disconnect(boolean 
failIfHasBacklog) {
-        if (failIfHasBacklog && getNumberOfEntriesInBacklog() > 0) {
-            CompletableFuture<Void> disconnectFuture = new 
CompletableFuture<>();
-            disconnectFuture.completeExceptionally(new 
TopicBusyException("Cannot close a replicator with backlog"));
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Replicator disconnect failed since topic has 
backlog", replicatorId);
-            }
-            return disconnectFuture;
+    protected boolean tryChangeStatusToTerminating() {
+        if (STATE_UPDATER.compareAndSet(this, State.Starting, 
State.Terminating)){
+            return true;
         }
-
-        if (STATE_UPDATER.get(this) == State.Stopping) {
-            // Do nothing since the all "STATE_UPDATER.set(this, Stopping)" 
instructions are followed by
-            // closeProducerAsync()
-            // which will at some point change the state to stopped
-            return CompletableFuture.completedFuture(null);
+        if (STATE_UPDATER.compareAndSet(this, State.Started, 
State.Terminating)){
+            return true;
         }
-
-        if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopping)
-                || STATE_UPDATER.compareAndSet(this, State.Started, 
State.Stopping)) {
-            log.info("[{}] Disconnect replicator at position {} with backlog 
{}", replicatorId,
-                    getReplicatorReadPosition(), 
getNumberOfEntriesInBacklog());
+        if (STATE_UPDATER.compareAndSet(this, State.Disconnecting, 
State.Terminating)){
+            return true;
         }
-
-        return closeProducerAsync();
+        if (STATE_UPDATER.compareAndSet(this, State.Disconnected, 
State.Terminating)) {
+            return true;
+        }
+        return false;
     }
 
     public CompletableFuture<Void> remove() {
@@ -300,4 +454,18 @@ public abstract class AbstractReplicator {
     public State getState() {
         return state;
     }
+
+    protected ImmutablePair<Boolean, State> compareSetAndGetState(State 
expect, State update) {
+        State original1 = state;
+        if (STATE_UPDATER.compareAndSet(this, expect, update)) {
+            return ImmutablePair.of(true, expect);
+        }
+        State original2 = state;
+        // Maybe the value changed more than once even if "original1 == 
original2", but the probability is very small,
+        // so let's ignore this case for prevent using a lock.
+        if (original1 == original2) {
+            return ImmutablePair.of(false, original1);
+        }
+        return compareSetAndGetState(expect, update);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 9654f198d7e..ea55a43c7f0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -689,7 +689,7 @@ public class BrokerService implements Closeable {
                 if (ot.isPresent()) {
                     Replicator r = ot.get().getReplicators().get(clusterName);
                     if (r != null && r.isConnected()) {
-                        r.disconnect(false).whenComplete((v, e) -> 
f.complete(null));
+                        r.terminate().whenComplete((v, e) -> f.complete(null));
                         return;
                     }
                 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
index 482fa2cbd23..8130b855b4e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
@@ -29,9 +29,9 @@ public interface Replicator {
 
     ReplicatorStatsImpl getStats();
 
-    CompletableFuture<Void> disconnect();
+    CompletableFuture<Void> terminate();
 
-    CompletableFuture<Void> disconnect(boolean b);
+    CompletableFuture<Void> disconnect(boolean failIfHasBacklog, boolean 
closeTheStartingProducer);
 
     void updateRates();
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index 087c5f93200..51509f3818a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -67,7 +67,7 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
     }
 
     @Override
-    protected void readEntries(Producer<byte[]> producer) {
+    protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) 
{
         this.producer = (ProducerImpl) producer;
 
         if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) {
@@ -78,8 +78,7 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
                     "[{}] Replicator was stopped while creating the producer."
                             + " Closing it. Replicator state: {}",
                     replicatorId, STATE_UPDATER.get(this));
-            STATE_UPDATER.set(this, State.Stopping);
-            closeProducerAsync();
+            doCloseProducerAsync(producer, () -> {});
             return;
         }
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index b0c6443332b..ecc6134ecda 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -420,7 +420,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
             CompletableFuture<Void> closeClientFuture = new 
CompletableFuture<>();
             if (closeIfClientsConnected) {
                 List<CompletableFuture<Void>> futures = new ArrayList<>();
-                replicators.forEach((cluster, replicator) -> 
futures.add(replicator.disconnect()));
+                replicators.forEach((cluster, replicator) -> 
futures.add(replicator.terminate()));
                 producers.values().forEach(producer -> 
futures.add(producer.disconnect()));
                 subscriptions.forEach((s, sub) -> futures.add(sub.close(true, 
Optional.empty())));
                 FutureUtil.waitForAll(futures).thenRun(() -> {
@@ -524,7 +524,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
 
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
-        replicators.forEach((cluster, replicator) -> 
futures.add(replicator.disconnect()));
+        replicators.forEach((cluster, replicator) -> 
futures.add(replicator.terminate()));
         if (disconnectClients) {
             futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
                 brokerService.getPulsar(), topic).thenAccept(lookupData -> {
@@ -583,7 +583,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
 
     public CompletableFuture<Void> stopReplProducers() {
         List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
-        replicators.forEach((region, replicator) -> 
closeFutures.add(replicator.disconnect()));
+        replicators.forEach((region, replicator) -> 
closeFutures.add(replicator.terminate()));
         return FutureUtil.waitForAll(closeFutures);
     }
 
@@ -663,7 +663,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
 
         String name = 
NonPersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
 
-        replicators.get(remoteCluster).disconnect().thenRun(() -> {
+        replicators.get(remoteCluster).terminate().thenRun(() -> {
             log.info("[{}] Successfully removed replicator {}", name, 
remoteCluster);
             replicators.remove(remoteCluster);
 
@@ -1032,7 +1032,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
         List<CompletableFuture<Void>> futures = new ArrayList<>();
         ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators = 
getReplicators();
         replicators.forEach((r, replicator) -> {
-            futures.add(replicator.disconnect());
+            futures.add(replicator.terminate());
         });
         return FutureUtil.waitForAll(futures);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index a4ac52d5ded..c6153e81a7a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import static 
org.apache.pulsar.broker.service.AbstractReplicator.State.Started;
+import static 
org.apache.pulsar.broker.service.AbstractReplicator.State.Starting;
+import static 
org.apache.pulsar.broker.service.AbstractReplicator.State.Terminated;
+import static 
org.apache.pulsar.broker.service.AbstractReplicator.State.Terminating;
 import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
@@ -26,7 +30,6 @@ import io.netty.util.Recycler.Handle;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -43,10 +46,10 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedE
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerService;
-import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import org.apache.pulsar.broker.service.MessageExpirer;
 import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
@@ -134,30 +137,44 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
     }
 
     @Override
-    protected void readEntries(Producer<byte[]> producer) {
-        // Rewind the cursor to be sure to read again all non-acked messages 
sent while restarting
+    protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) 
{
+        // Rewind the cursor to be sure to read again all non-acked messages 
sent while restarting.
         cursor.rewind();
-
         cursor.cancelPendingReadRequest();
-        HAVE_PENDING_READ_UPDATER.set(this, FALSE);
-        this.producer = (ProducerImpl) producer;
 
-        if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) {
-            log.info("[{}] Created replicator producer", replicatorId);
+        /**
+         * 1. Try change state to {@link Started}.
+         * 2. Atoms modify multiple properties if change state success, to 
avoid another thread get a null value
+         *    producer when the state is {@link Started}.
+         */
+        Pair<Boolean, State> changeStateRes;
+        changeStateRes = compareSetAndGetState(Starting, Started);
+        if (changeStateRes.getLeft()) {
+            this.producer = (ProducerImpl) producer;
+            HAVE_PENDING_READ_UPDATER.set(this, FALSE);
+            // Trigger a new read.
+            log.info("[{}] Created replicator producer, Replicator state: {}", 
replicatorId, state);
             backOff.reset();
-            // activate cursor: so, entries can be cached
+            // activate cursor: so, entries can be cached.
             this.cursor.setActive();
             // read entries
             readMoreEntries();
         } else {
-            log.info(
-                    "[{}] Replicator was stopped while creating the producer."
-                            + " Closing it. Replicator state: {}",
-                    replicatorId, STATE_UPDATER.get(this));
-            STATE_UPDATER.set(this, State.Stopping);
-            closeProducerAsync();
+            if (changeStateRes.getRight() == Started) {
+                // Since only one task can call 
"producerBuilder.createAsync()", this scenario is not expected.
+                // So print a warn log.
+                log.warn("[{}] Replicator was already started by another 
thread while creating the producer."
+                        + " Closing the producer newly created. Replicator 
state: {}", replicatorId, state);
+            } else if (changeStateRes.getRight() == Terminating || 
changeStateRes.getRight() == Terminated) {
+                log.info("[{}] Replicator was terminated, so close the 
producer. Replicator state: {}",
+                        replicatorId, state);
+            } else {
+                log.error("[{}] Replicator state is not expected, so close the 
producer. Replicator state: {}",
+                        replicatorId, changeStateRes.getRight());
+            }
+            // Close the producer if change the state fail.
+            doCloseProducerAsync(producer, () -> {});
         }
-
     }
 
     @Override
@@ -420,8 +437,8 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
 
     @Override
     public void readEntriesFailed(ManagedLedgerException exception, Object 
ctx) {
-        if (STATE_UPDATER.get(this) != State.Started) {
-            log.info("[{}] Replicator was stopped while reading entries."
+        if (state != Started) {
+            log.info("[{}] Replicator was disconnected while reading entries."
                             + " Stop reading. Replicator state: {}",
                     replicatorId, STATE_UPDATER.get(this));
             return;
@@ -436,8 +453,8 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
             log.error("[{}] Error reading entries because replicator is"
                             + " already deleted and cursor is already closed 
{}, ({})",
                     replicatorId, ctx, exception.getMessage(), exception);
-            // replicator is already deleted and cursor is already closed so, 
producer should also be stopped
-            closeProducerAsync();
+            // replicator is already deleted and cursor is already closed so, 
producer should also be disconnected.
+            terminate();
             return;
         } else if (!(exception instanceof TooManyRequestsException)) {
             log.error("[{}] Error reading entries at {}. Retrying to read in 
{}s. ({})",
@@ -555,8 +572,8 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         if (exception instanceof CursorAlreadyClosedException) {
             log.error("[{}] Asynchronous ack failure because replicator is 
already deleted and cursor is already"
                             + " closed {}, ({})", replicatorId, ctx, 
exception.getMessage(), exception);
-            // replicator is already deleted and cursor is already closed so, 
producer should also be stopped
-            closeProducerAsync();
+            // replicator is already deleted and cursor is already closed so, 
producer should also be disconnected.
+            terminate();
             return;
         }
         if (ctx instanceof PositionImpl) {
@@ -675,30 +692,6 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         }
     }
 
-    @Override
-    public CompletableFuture<Void> disconnect() {
-        return disconnect(false);
-    }
-
-    @Override
-    public synchronized CompletableFuture<Void> disconnect(boolean 
failIfHasBacklog) {
-        final CompletableFuture<Void> future = new CompletableFuture<>();
-
-        super.disconnect(failIfHasBacklog).thenRun(() -> {
-            dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
-            future.complete(null);
-        }).exceptionally(ex -> {
-            Throwable t = (ex instanceof CompletionException ? ex.getCause() : 
ex);
-            if (!(t instanceof TopicBusyException)) {
-                log.error("[{}] Failed to close dispatch rate limiter: {}", 
replicatorId, ex.getMessage());
-            }
-            future.completeExceptionally(t);
-            return null;
-        });
-
-        return future;
-    }
-
     @Override
     public boolean isConnected() {
         ProducerImpl<?> producer = this.producer;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index cce77013030..7d3ca226fe4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -799,15 +799,15 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     public CompletableFuture<Void> stopReplProducers() {
         List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
-        replicators.forEach((region, replicator) -> 
closeFutures.add(replicator.disconnect()));
-        shadowReplicators.forEach((__, replicator) -> 
closeFutures.add(replicator.disconnect()));
+        replicators.forEach((region, replicator) -> 
closeFutures.add(replicator.terminate()));
+        shadowReplicators.forEach((__, replicator) -> 
closeFutures.add(replicator.terminate()));
         return FutureUtil.waitForAll(closeFutures);
     }
 
     private synchronized CompletableFuture<Void> 
closeReplProducersIfNoBacklog() {
         List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
-        replicators.forEach((region, replicator) -> 
closeFutures.add(replicator.disconnect(true)));
-        shadowReplicators.forEach((__, replicator) -> 
closeFutures.add(replicator.disconnect(true)));
+        replicators.forEach((region, replicator) -> 
closeFutures.add(replicator.disconnect(true, true)));
+        shadowReplicators.forEach((__, replicator) -> 
closeFutures.add(replicator.disconnect(true, true)));
         return FutureUtil.waitForAll(closeFutures);
     }
 
@@ -1389,8 +1389,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 List<CompletableFuture<Void>> futures = new ArrayList<>();
                 subscriptions.forEach((s, sub) -> futures.add(sub.close(true, 
Optional.empty())));
                 if (closeIfClientsConnected) {
-                    replicators.forEach((cluster, replicator) -> 
futures.add(replicator.disconnect()));
-                    shadowReplicators.forEach((__, replicator) -> 
futures.add(replicator.disconnect()));
+                    replicators.forEach((cluster, replicator) -> 
futures.add(replicator.terminate()));
+                    shadowReplicators.forEach((__, replicator) -> 
futures.add(replicator.terminate()));
                     producers.values().forEach(producer -> 
futures.add(producer.disconnect()));
                 }
                 FutureUtil.waitForAll(futures).thenRunAsync(() -> {
@@ -1532,8 +1532,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
         futures.add(transactionBuffer.closeAsync());
-        replicators.forEach((cluster, replicator) -> 
futures.add(replicator.disconnect()));
-        shadowReplicators.forEach((__, replicator) -> 
futures.add(replicator.disconnect()));
+        replicators.forEach((cluster, replicator) -> 
futures.add(replicator.terminate()));
+        shadowReplicators.forEach((__, replicator) -> 
futures.add(replicator.terminate()));
         if (disconnectClients) {
             futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
                 brokerService.getPulsar(), topic).thenAccept(lookupData -> {
@@ -1908,7 +1908,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
         String name = PersistentReplicator.getReplicatorName(replicatorPrefix, 
remoteCluster);
 
-        
Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::disconnect)
+        
Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::terminate)
                 .orElse(CompletableFuture.completedFuture(null)).thenRun(() -> 
{
             ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
                 @Override
@@ -1980,7 +1980,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         log.info("[{}] Removing shadow topic replicator to {}", topic, 
shadowTopic);
         final CompletableFuture<Void> future = new CompletableFuture<>();
         String name = 
ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic);
-        shadowReplicators.get(shadowTopic).disconnect().thenRun(() -> {
+        shadowReplicators.get(shadowTopic).terminate().thenRun(() -> {
 
             ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
                 @Override
@@ -2849,7 +2849,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         ConcurrentOpenHashMap<String, Replicator> replicators = 
getReplicators();
         replicators.forEach((r, replicator) -> {
             if (replicator.getNumberOfEntriesInBacklog() <= 0) {
-                futures.add(replicator.disconnect());
+                futures.add(replicator.terminate());
             }
         });
         return FutureUtil.waitForAll(futures);
@@ -2900,6 +2900,15 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     log.debug("[{}] Global topic inactive for {} seconds, 
closing repl producers.", topic,
                         maxInactiveDurationInSec);
                 }
+                /**
+                 * There is a race condition that may cause a NPE:
+                 * - task 1: a callback of "replicator.cursor.asyncRead" will 
trigger a replication.
+                 * - task 2: "closeReplProducersIfNoBacklog" called by current 
thread will make the variable
+                 *   "replicator.producer" to a null value.
+                 * Race condition: task 1 will get a NPE when it tries to send 
messages using the variable
+                 * "replicator.producer", because task 2 will set this 
variable to "null".
+                 * TODO Create a seperated PR to fix it.
+                 */
                 closeReplProducersIfNoBacklog().thenRun(() -> {
                     if (hasRemoteProducers()) {
                         if (log.isDebugEnabled()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
index 8699c732468..7aebf20896c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
@@ -94,7 +95,7 @@ public class AbstractReplicatorTest {
         final ReplicatorInTest replicator = new ReplicatorInTest(localCluster, 
localTopic, remoteCluster, topicName,
                 replicatorPrefix, broker, remoteClient);
         replicator.startProducer();
-        replicator.disconnect();
+        replicator.terminate();
 
         // Verify task will done.
         Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
@@ -129,7 +130,7 @@ public class AbstractReplicatorTest {
         }
 
         @Override
-        protected void readEntries(Producer<byte[]> producer) {
+        protected void setProducerAndTriggerReadEntries(Producer<byte[]> 
producer) {
 
         }
 
@@ -139,7 +140,22 @@ public class AbstractReplicatorTest {
         }
 
         @Override
-        protected long getNumberOfEntriesInBacklog() {
+        public ReplicatorStatsImpl getStats() {
+            return null;
+        }
+
+        @Override
+        public void updateRates() {
+
+        }
+
+        @Override
+        public boolean isConnected() {
+            return false;
+        }
+
+        @Override
+        public long getNumberOfEntriesInBacklog() {
             return 0;
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 1accd04f491..f9184f2288f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -18,28 +18,56 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
+import io.netty.util.concurrent.FastThreadLocalThread;
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.time.Duration;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+@Slf4j
 @Test(groups = "broker")
 public class OneWayReplicatorTest extends OneWayReplicatorTestBase {
 
@@ -78,7 +106,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         return originalValue;
     }
 
-    @Test
+    @Test(timeOut = 45 * 1000)
     public void testReplicatorProducerStatInTopic() throws Exception {
         final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ defaultNamespace + "/tp_");
         final String subscribeName = "subscribe_1";
@@ -104,7 +132,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         });
     }
 
-    @Test
+    @Test(timeOut = 45 * 1000)
     public void testCreateRemoteConsumerFirst() throws Exception {
         final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ defaultNamespace + "/tp_");
         Producer<String> producer1 = 
client1.newProducer(Schema.STRING).topic(topicName).create();
@@ -124,29 +152,257 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         });
     }
 
-    @Test
+    @Test(timeOut = 45 * 1000)
     public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws 
Exception {
         final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ defaultNamespace + "/tp_");
         admin1.topics().createNonPartitionedTopic(topicName);
         // Wait for replicator started.
         waitReplicatorStarted(topicName);
-        PersistentTopic persistentTopic =
+        PersistentTopic topic1 =
                 (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
-        PersistentReplicator replicator =
-                (PersistentReplicator) 
persistentTopic.getReplicators().values().iterator().next();
+        PersistentReplicator replicator1 =
+                (PersistentReplicator) 
topic1.getReplicators().values().iterator().next();
         // Mock an error when calling "replicator.disconnect()"
-        ProducerImpl mockProducer = Mockito.mock(ProducerImpl.class);
-        
Mockito.when(mockProducer.closeAsync()).thenReturn(CompletableFuture.failedFuture(new
 Exception("mocked ex")));
-        ProducerImpl originalProducer = 
overrideProducerForReplicator(replicator, mockProducer);
+        AtomicBoolean closeFailed = new AtomicBoolean(true);
+        final ProducerImpl mockProducer = Mockito.mock(ProducerImpl.class);
+        final AtomicReference<ProducerImpl> originalProducer1 = new 
AtomicReference();
+        doAnswer(invocation -> {
+            if (closeFailed.get()) {
+                return CompletableFuture.failedFuture(new Exception("mocked 
ex"));
+            } else {
+                return originalProducer1.get().closeAsync();
+            }
+        }).when(mockProducer).closeAsync();
+        originalProducer1.set(overrideProducerForReplicator(replicator1, 
mockProducer));
         // Verify: since the "replicator.producer.closeAsync()" will retry 
after it failed, the topic unload should be
         // successful.
         admin1.topics().unload(topicName);
         // Verify: After "replicator.producer.closeAsync()" retry again, the 
"replicator.producer" will be closed
         // successful.
-        overrideProducerForReplicator(replicator, originalProducer);
+        closeFailed.set(false);
+        AtomicReference<PersistentTopic> topic2 = new AtomicReference();
+        AtomicReference<PersistentReplicator> replicator2 = new 
AtomicReference();
         Awaitility.await().untilAsserted(() -> {
+            topic2.set((PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName, false).join().get());
+            replicator2.set((PersistentReplicator) 
topic2.get().getReplicators().values().iterator().next());
+            // It is a new Topic after reloading.
+            assertNotEquals(topic2.get(), topic1);
+            assertNotEquals(replicator2.get(), replicator1);
+        });
+        Awaitility.await().untilAsserted(() -> {
+            // Old replicator should be closed.
+            Assert.assertFalse(replicator1.isConnected());
+            Assert.assertFalse(originalProducer1.get().isConnected());
+            // New replicator should be connected.
+            Assert.assertTrue(replicator2.get().isConnected());
+        });
+        // cleanup.
+        cleanupTopics(() -> {
+            admin1.topics().delete(topicName);
+            admin2.topics().delete(topicName);
+        });
+    }
+
+    private void injectMockReplicatorProducerBuilder(
+                                BiFunction<ProducerConfigurationData, 
ProducerImpl, ProducerImpl> producerDecorator)
+            throws Exception {
+        String cluster2 = pulsar2.getConfig().getClusterName();
+        BrokerService brokerService = pulsar1.getBrokerService();
+        // Wait for the internal client created.
+        final String topicNameTriggerInternalClientCreate =
+                BrokerTestUtil.newUniqueName("persistent://" + 
defaultNamespace + "/tp_");
+        
admin1.topics().createNonPartitionedTopic(topicNameTriggerInternalClientCreate);
+        waitReplicatorStarted(topicNameTriggerInternalClientCreate);
+        cleanupTopics(() -> {
+            admin1.topics().delete(topicNameTriggerInternalClientCreate);
+            admin2.topics().delete(topicNameTriggerInternalClientCreate);
+        });
+
+        // Inject spy client.
+        ConcurrentOpenHashMap<String, PulsarClient>
+                replicationClients = 
WhiteboxImpl.getInternalState(brokerService, "replicationClients");
+        PulsarClientImpl internalClient = (PulsarClientImpl) 
replicationClients.get(cluster2);
+        PulsarClient spyClient = spy(internalClient);
+        replicationClients.put(cluster2, spyClient);
+
+        // Inject producer decorator.
+        doAnswer(invocation -> {
+            Schema schema = (Schema) invocation.getArguments()[0];
+            ProducerBuilderImpl<?> producerBuilder = (ProducerBuilderImpl) 
internalClient.newProducer(schema);
+            ProducerBuilder spyProducerBuilder = spy(producerBuilder);
+            doAnswer(ignore -> {
+                CompletableFuture<Producer> producerFuture = new 
CompletableFuture<>();
+                producerBuilder.createAsync().whenComplete((p, t) -> {
+                    if (t != null) {
+                        producerFuture.completeExceptionally(t);
+                        return;
+                    }
+                    ProducerImpl pImpl = (ProducerImpl) p;
+                    new FastThreadLocalThread(() -> {
+                        try {
+                            ProducerImpl newProducer = 
producerDecorator.apply(producerBuilder.getConf(), pImpl);
+                            producerFuture.complete(newProducer);
+                        } catch (Exception ex) {
+                            producerFuture.completeExceptionally(ex);
+                        }
+                    }).start();
+                });
+
+                return producerFuture;
+            }).when(spyProducerBuilder).createAsync();
+            return spyProducerBuilder;
+        }).when(spyClient).newProducer(any(Schema.class));
+    }
+
+    private SpyCursor spyCursor(PersistentTopic persistentTopic, String 
cursorName) throws Exception {
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ml.getCursors().get(cursorName);
+        ManagedCursorImpl spyCursor = spy(cursor);
+        // remove cursor.
+        ml.getCursors().removeCursor(cursorName);
+        ml.deactivateCursor(cursor);
+        // Add the spy one. addCursor(ManagedCursorImpl cursor)
+        Method m = ManagedLedgerImpl.class.getDeclaredMethod("addCursor", new 
Class[]{ManagedCursorImpl.class});
+        m.setAccessible(true);
+        m.invoke(ml, new Object[]{spyCursor});
+        return new SpyCursor(cursor, spyCursor);
+    }
+
+    @Data
+    @AllArgsConstructor
+    static class SpyCursor {
+        ManagedCursorImpl original;
+        ManagedCursorImpl spy;
+    }
+
+    private CursorCloseSignal makeCursorClosingDelay(SpyCursor spyCursor) 
throws Exception {
+        CountDownLatch startCloseSignal = new CountDownLatch(1);
+        CountDownLatch startCallbackSignal = new CountDownLatch(1);
+        doAnswer(invocation -> {
+            AsyncCallbacks.CloseCallback originalCallback = 
(AsyncCallbacks.CloseCallback) invocation.getArguments()[0];
+            Object ctx = invocation.getArguments()[1];
+            AsyncCallbacks.CloseCallback newCallback = new 
AsyncCallbacks.CloseCallback() {
+                @Override
+                public void closeComplete(Object ctx) {
+                    new FastThreadLocalThread(new Runnable() {
+                        @Override
+                        @SneakyThrows
+                        public void run() {
+                            startCallbackSignal.await();
+                            originalCallback.closeComplete(ctx);
+                        }
+                    }).start();
+                }
+
+                @Override
+                public void closeFailed(ManagedLedgerException exception, 
Object ctx) {
+                    new FastThreadLocalThread(new Runnable() {
+                        @Override
+                        @SneakyThrows
+                        public void run() {
+                            startCallbackSignal.await();
+                            originalCallback.closeFailed(exception, ctx);
+                        }
+                    }).start();
+                }
+            };
+            startCloseSignal.await();
+            spyCursor.original.asyncClose(newCallback, ctx);
+            return null;
+        
}).when(spyCursor.spy).asyncClose(any(AsyncCallbacks.CloseCallback.class), 
any());
+        return new CursorCloseSignal(startCloseSignal, startCallbackSignal);
+    }
+
+    @AllArgsConstructor
+    static class CursorCloseSignal {
+        CountDownLatch startCloseSignal;
+        CountDownLatch startCallbackSignal;
+
+        void startClose() {
+            startCloseSignal.countDown();
+        }
+
+        void startCallback() {
+            startCallbackSignal.countDown();
+        }
+    }
+
+    /**
+     * See the description and execution flow: 
https://github.com/apache/pulsar/pull/21946.
+     * Steps:
+     * - Create topic, but the internal producer of Replicator created failed.
+     * - Unload bundle, the Replicator will be closed, but the internal 
producer creation retry has not executed yet.
+     * - The internal producer creation retry execute successfully, the 
"repl.cursor" has not been closed yet.
+     * - The topic is wholly closed.
+     * - Verify: the delayed created internal producer will be closed.
+     */
+    @Test(timeOut = 120 * 1000)
+    public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws 
Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ defaultNamespace + "/tp_");
+        // Inject an error for "replicator.producer" creation.
+        // The delay time of next retry to create producer is below:
+        //   0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s...
+        //   If the retry counter is larger than 6, the next creation will be 
slow enough to close Replicator.
+        final AtomicInteger createProducerCounter = new AtomicInteger();
+        final int failTimes = 6;
+        injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> 
{
+            if (topicName.equals(producerCnf.getTopicName())) {
+                // There is a switch to determine create producer successfully 
or not.
+                if (createProducerCounter.incrementAndGet() > failTimes) {
+                    return originalProducer;
+                }
+                log.info("Retry create replicator.producer count: {}", 
createProducerCounter);
+                // Release producer and fail callback.
+                originalProducer.closeAsync();
+                throw new RuntimeException("mock error");
+            }
+            return originalProducer;
+        });
+
+        // Create topic.
+        admin1.topics().createNonPartitionedTopic(topicName);
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+        PersistentReplicator replicator =
+                (PersistentReplicator) 
persistentTopic.getReplicators().values().iterator().next();
+        // Since we inject a producer creation error, the replicator can not 
start successfully.
+        assertFalse(replicator.isConnected());
+
+        // Stuck the closing of the cursor("pulsar.repl"), until the internal 
producer of the replicator started.
+        SpyCursor spyCursor =
+                spyCursor(persistentTopic, "pulsar.repl." + 
pulsar2.getConfig().getClusterName());
+        CursorCloseSignal cursorCloseSignal = 
makeCursorClosingDelay(spyCursor);
+
+        // Unload bundle: call "topic.close(false)".
+        // Stuck start new producer, until the state of replicator change to 
Stopped.
+        // The next once of "createProducerSuccessAfterFailTimes" to create 
producer will be successfully.
+        
Awaitility.await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(60)).untilAsserted(()
 -> {
+            assertTrue(createProducerCounter.get() >= failTimes,
+                    "count of retry to create producer is " + 
createProducerCounter.get());
+        });
+        CompletableFuture<Void> topicCloseFuture = persistentTopic.close(true);
+        Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
+            String state = String.valueOf(replicator.getState());
+            assertTrue(state.equals("Stopped") || state.equals("Terminated"));
+        });
+
+        // Delay close cursor, until "replicator.producer" create successfully.
+        // The next once retry time of create "replicator.producer" will be 
3.2s.
+        Thread.sleep(4 * 1000);
+        log.info("Replicator.state: {}", replicator.getState());
+        cursorCloseSignal.startClose();
+        cursorCloseSignal.startCallback();
+
+        // Wait for topic close successfully.
+        // Verify there is no orphan producer on the remote cluster.
+        topicCloseFuture.join();
+        
Awaitility.await().pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
+            PersistentTopic persistentTopic2 =
+                    (PersistentTopic) 
pulsar2.getBrokerService().getTopic(topicName, false).join().get();
+            assertEquals(persistentTopic2.getProducers().size(), 0);
             Assert.assertFalse(replicator.isConnected());
         });
+
         // cleanup.
         cleanupTopics(() -> {
             admin1.topics().delete(topicName);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index 795983f7482..24aec851e19 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -18,21 +18,28 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import com.google.common.collect.Sets;
 import java.net.URL;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.tests.TestRetrySupport;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
 
 @Slf4j
 public abstract class OneWayReplicatorTestBase extends TestRetrySupport {
@@ -140,10 +147,32 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
     }
 
     protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws 
Exception {
+        waitChangeEventsInit(defaultNamespace);
         admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, 
Collections.singleton(cluster1));
         admin1.namespaces().unload(defaultNamespace);
         cleanupTopicAction.run();
         admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, 
Sets.newHashSet(cluster1, cluster2));
+        waitChangeEventsInit(defaultNamespace);
+    }
+
+    protected void waitChangeEventsInit(String namespace) {
+        PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService()
+                .getTopic(namespace + "/" + 
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, false)
+                .join().get();
+        Awaitility.await().atMost(Duration.ofSeconds(180)).untilAsserted(() -> 
{
+            TopicStatsImpl topicStats = topic.getStats(true, false, false);
+            topicStats.getSubscriptions().entrySet().forEach(entry -> {
+                // No wait for compaction.
+                if (COMPACTION_SUBSCRIPTION.equals(entry.getKey())) {
+                    return;
+                }
+                // No wait for durable cursor.
+                if (entry.getValue().isDurable()) {
+                    return;
+                }
+                Assert.assertTrue(entry.getValue().getMsgBacklog() == 0, 
entry.getKey());
+            });
+        });
     }
 
     protected interface CleanupTopicAction {
@@ -166,7 +195,7 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
         log.info("--- OneWayReplicatorTestBase::setup completed ---");
     }
 
-    private void setConfigDefaults(ServiceConfiguration config, String 
clusterName,
+    protected void setConfigDefaults(ServiceConfiguration config, String 
clusterName,
                                    LocalBookkeeperEnsemble bookkeeperEnsemble, 
ZookeeperServerTest brokerConfigZk) {
         config.setClusterName(clusterName);
         config.setAdvertisedAddress("localhost");
@@ -185,10 +214,19 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
         config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
         config.setEnableReplicatedSubscriptions(true);
         config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
+        config.setLoadBalancerSheddingEnabled(false);
     }
 
     @Override
     protected void cleanup() throws Exception {
+        // delete namespaces.
+        waitChangeEventsInit(defaultNamespace);
+        admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, 
Sets.newHashSet(cluster1));
+        admin1.namespaces().deleteNamespace(defaultNamespace);
+        admin2.namespaces().setNamespaceReplicationClusters(defaultNamespace, 
Sets.newHashSet(cluster2));
+        admin2.namespaces().deleteNamespace(defaultNamespace);
+
+        // shutdown.
         markCurrentSetupNumberCleaned();
         log.info("--- Shutting down ---");
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 04bf36eaa66..412dfb381c8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -1799,12 +1799,12 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
                         any(), eq(null)
                 );
 
-        replicator.disconnect(false);
-        replicator.disconnect(false);
+        replicator.terminate();
+        replicator.terminate();
 
         replicator.startProducer();
 
-        verify(clientImpl, Mockito.times(2)).createProducerAsync(any(), any(), 
any());
+        verify(clientImpl, Mockito.times(1)).createProducerAsync(any(), any(), 
any());
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 6fe6a47cab9..c809a2e371f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -897,7 +897,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
         pulsar2 = null;
         pulsar3.close();
         pulsar3 = null;
-        replicator.disconnect(false);
+        replicator.terminate();
         Thread.sleep(100);
         Field field = AbstractReplicator.class.getDeclaredField("producer");
         field.setAccessible(true);
@@ -1836,7 +1836,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
         persistentTopic.getReplicators().forEach((cluster, replicator) -> {
             PersistentReplicator persistentReplicator = (PersistentReplicator) 
replicator;
             // Pause replicator
-            persistentReplicator.disconnect();
+            pauseReplicator(persistentReplicator);
         });
 
         persistentProducer1.send("V2".getBytes());
@@ -1876,4 +1876,11 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         assertEquals(result, Lists.newArrayList("V1", "V2", "V3", "V4"));
     }
+
+    private void pauseReplicator(PersistentReplicator replicator) {
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(replicator.isConnected());
+        });
+        replicator.closeProducerAsync(true);
+    }
 }

Reply via email to