poorbarcode commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1536576438


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##########
@@ -101,78 +132,133 @@ public AbstractReplicator(String localCluster, Topic 
localTopic, String remoteCl
 
     protected abstract String getProducerName();
 
-    protected abstract void 
readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer);
+    protected abstract void 
setProducerAndTriggerReadEntries(org.apache.pulsar.client.api.Producer<byte[]> 
producer);
 
     protected abstract Position getReplicatorReadPosition();
 
-    protected abstract long getNumberOfEntriesInBacklog();
+    public abstract long getNumberOfEntriesInBacklog();
 
     protected abstract void disableReplicatorRead();
 
     public String getRemoteCluster() {
         return remoteCluster;
     }
 
-    // This method needs to be synchronized with disconnects else if there is 
a disconnect followed by startProducer
-    // the end result can be disconnect.
-    public synchronized void startProducer() {
-        if (STATE_UPDATER.get(this) == State.Stopping) {
-            long waitTimeMs = backOff.next();
-            if (log.isDebugEnabled()) {
-                log.debug(
-                        "[{}] waiting for producer to close before attempting 
to reconnect, retrying in {} s",
-                        replicatorId, waitTimeMs / 1000.0);
-            }
-            // BackOff before retrying
-            
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, 
waitTimeMs,
-                    TimeUnit.MILLISECONDS);
-            return;
-        }
-        State state = STATE_UPDATER.get(this);
-        if (!STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting)) 
{
-            if (state == State.Started) {
-                // Already running
+    public void startProducer() {
+        // Guarantee only one task call "producerBuilder.createAsync()".
+        Pair<Boolean, State> setStartingRes = 
compareSetAndGetState(State.Stopped, State.Starting);
+        if (!setStartingRes.getLeft()) {
+            if (setStartingRes.getRight() == State.Starting) {
+                log.info("[{}] Skip the producer creation since other thread 
is doing starting, state : {}",
+                        replicatorId, state);
+            } else if (setStartingRes.getRight() == State.Started) {
+                // Since the method "startProducer" will be called even if it 
is started, only print debug-level log.
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Replicator was already running. state: 
{}", replicatorId, state);
+                }
+            } else if (setStartingRes.getRight() == State.Stopping) {
                 if (log.isDebugEnabled()) {
-                    log.debug("[{}] Replicator was already running", 
replicatorId);
+                    log.debug("[{}] Rep.producer is closing, delay to 
retry(wait the producer close success)."
+                            + " state: {}", replicatorId, state);
                 }
+                delayStartProducerAfterStopped();
             } else {
-                log.info("[{}] Replicator already being started. Replicator 
state: {}", replicatorId, state);
+                /** {@link State.Terminating}, {@link State.Terminated}. **/
+                log.info("[{}] Skip the producer creation since the replicator 
state is : {}", replicatorId, state);
             }
-
             return;
         }
 
         log.info("[{}] Starting replicator", replicatorId);
         producerBuilder.createAsync().thenAccept(producer -> {
-            readEntries(producer);
+            setProducerAndTriggerReadEntries(producer);

Review Comment:
   > Actually, this method can't ensure the the replicator can be started 
successfully. It just close the producer internally and return. So we will not 
get any exception in this case.
   
   There are two logic branches:
   - Change state to `Started` successfully: set the variable `producer`, and 
trigger a new `readMoreEntries`.
   - Change state to `Started` is fail: close the producer new created, then 
return.
   
   Once the thread goes to the logic branch 2, it means there is another thread 
that has changed the `state` to another value, such as `Started`, `Stopped`, 
then the thread who changed the `state` successfully, it will guarantee the 
subsequent works.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to