gaoran10 commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1565113797


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##########
@@ -188,58 +270,135 @@ protected CompletableFuture<Boolean> 
isLocalTopicActive() {
         }, brokerService.executor());
     }
 
-    protected synchronized CompletableFuture<Void> closeProducerAsync() {
-        if (producer == null) {
-            STATE_UPDATER.set(this, State.Stopped);
+    /**
+     * This method only be used by {@link PersistentTopic#checkGC} now.
+     */
+    public CompletableFuture<Void> disconnect(boolean failIfHasBacklog, 
boolean closeTheStartingProducer) {
+        long backlog = getNumberOfEntriesInBacklog();
+        if (failIfHasBacklog && backlog > 0) {
+            CompletableFuture<Void> disconnectFuture = new 
CompletableFuture<>();
+            disconnectFuture.completeExceptionally(new 
TopicBusyException("Cannot close a replicator with backlog"));
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Replicator disconnect failed since topic has 
backlog", replicatorId);
+            }
+            return disconnectFuture;
+        }
+        log.info("[{}] Disconnect replicator at position {} with backlog {}", 
replicatorId,
+                getReplicatorReadPosition(), backlog);
+        return closeProducerAsync(closeTheStartingProducer);
+    }
+
+    /**
+     * This method only be used by {@link PersistentTopic#checkGC} now.
+     */
+    protected CompletableFuture<Void> closeProducerAsync(boolean 
closeTheStartingProducer) {
+        Pair<Boolean, State> setDisconnectingRes = 
compareSetAndGetState(State.Started, State.Disconnecting);
+        if (!setDisconnectingRes.getLeft()) {
+            if (setDisconnectingRes.getRight() == State.Starting) {
+                if (closeTheStartingProducer) {
+                    /**
+                     * Delay retry(wait for the start producer task is finish).
+                     * Note: If the producer always start fail, the start 
producer task will always retry until the
+                     *   state changed to {@link State.Terminated}.
+                     *   Nit: The better solution is creating a {@link 
CompletableFuture} to trace the in-progress
+                     *     creation and call 
"inProgressCreationFuture.thenApply(closeProducer())".
+                     */
+                    long waitTimeMs = backOff.next();
+                    brokerService.executor().schedule(() -> 
closeProducerAsync(true),
+                            waitTimeMs, TimeUnit.MILLISECONDS);
+                } else {
+                    log.info("[{}] Skip current producer closing since the 
previous producer has been closed,"
+                                    + " and trying start a new one, state : 
{}",
+                            replicatorId, setDisconnectingRes.getRight());
+                }
+            } else if (setDisconnectingRes.getRight() == State.Disconnected
+                    || setDisconnectingRes.getRight() == State.Disconnecting) {
+                log.info("[{}] Skip current producer closing since other 
thread did closing, state : {}",
+                        replicatorId, setDisconnectingRes.getRight());
+            } else if (setDisconnectingRes.getRight() == State.Terminating
+                    || setDisconnectingRes.getRight() == State.Terminated) {
+                log.info("[{}] Skip current producer closing since other 
thread is doing termination, state : {}",
+                        replicatorId, state);
+            }
+            log.info("[{}] Skip current termination since other thread is 
doing close producer or termination,"
+                            + " state : {}", replicatorId, state);
             return CompletableFuture.completedFuture(null);
         }
-        CompletableFuture<Void> future = producer.closeAsync();
+
+        // Close producer and update state.
+        return doCloseProducerAsync(producer, () -> {
+            Pair<Boolean, State> setDisconnectedRes = 
compareSetAndGetState(State.Disconnecting, State.Disconnected);
+            if (setDisconnectedRes.getLeft()) {
+                this.producer = null;
+                // deactivate further read
+                disableReplicatorRead();
+                return;
+            }
+            if (setDisconnectedRes.getRight() == State.Terminating
+                    || setDisconnectingRes.getRight() == State.Terminated) {
+                log.info("[{}] Skip setting state to terminated because it was 
terminated, state : {}",
+                        replicatorId, state);
+            } else {
+                // Since only one task can call 
"doCloseProducerAsync(producer, action)", this scenario is not expected.
+                // So print a warn log.
+                log.warn("[{}] Other task has change the state to terminated. 
so skipped current one task."
+                                + " State is : {}",
+                        replicatorId, state);
+            }
+        });
+    }
+
+    protected CompletableFuture<Void> doCloseProducerAsync(Producer<byte[]> 
producer, Runnable actionAfterClosed) {
+        CompletableFuture<Void> future =
+                producer == null ? CompletableFuture.completedFuture(null) : 
producer.closeAsync();
         return future.thenRun(() -> {
-            STATE_UPDATER.set(this, State.Stopped);
-            this.producer = null;
-            // deactivate further read
-            disableReplicatorRead();
+            actionAfterClosed.run();
         }).exceptionally(ex -> {
             long waitTimeMs = backOff.next();
             log.warn(
-                    "[{}] Exception: '{}' occurred while trying to close the 
producer."
-                            + " retrying again in {} s",
-                    replicatorId, ex.getMessage(), waitTimeMs / 1000.0);
+                    "[{}] Exception: '{}' occurred while trying to close the 
producer. Replicator state: {}."
+                            + " Retrying again in {} s.",
+                    replicatorId, ex.getMessage(), state, waitTimeMs / 1000.0);
             // BackOff before retrying
-            brokerService.executor().schedule(this::closeProducerAsync, 
waitTimeMs, TimeUnit.MILLISECONDS);
+            brokerService.executor().schedule(() -> 
doCloseProducerAsync(producer, actionAfterClosed),
+                    waitTimeMs, TimeUnit.MILLISECONDS);
             return null;
         });
     }
 
+    public CompletableFuture<Void> terminate() {
+        if (!tryChangeStatusToTerminating()) {

Review Comment:
   Got it, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to