poorbarcode commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1564800410
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##########
@@ -188,58 +270,135 @@ protected CompletableFuture<Boolean>
isLocalTopicActive() {
}, brokerService.executor());
}
- protected synchronized CompletableFuture<Void> closeProducerAsync() {
- if (producer == null) {
- STATE_UPDATER.set(this, State.Stopped);
+ /**
+ * This method only be used by {@link PersistentTopic#checkGC} now.
+ */
+ public CompletableFuture<Void> disconnect(boolean failIfHasBacklog,
boolean closeTheStartingProducer) {
+ long backlog = getNumberOfEntriesInBacklog();
+ if (failIfHasBacklog && backlog > 0) {
+ CompletableFuture<Void> disconnectFuture = new
CompletableFuture<>();
+ disconnectFuture.completeExceptionally(new
TopicBusyException("Cannot close a replicator with backlog"));
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Replicator disconnect failed since topic has
backlog", replicatorId);
+ }
+ return disconnectFuture;
+ }
+ log.info("[{}] Disconnect replicator at position {} with backlog {}",
replicatorId,
+ getReplicatorReadPosition(), backlog);
+ return closeProducerAsync(closeTheStartingProducer);
+ }
+
+ /**
+ * This method only be used by {@link PersistentTopic#checkGC} now.
+ */
+ protected CompletableFuture<Void> closeProducerAsync(boolean
closeTheStartingProducer) {
+ Pair<Boolean, State> setDisconnectingRes =
compareSetAndGetState(State.Started, State.Disconnecting);
+ if (!setDisconnectingRes.getLeft()) {
+ if (setDisconnectingRes.getRight() == State.Starting) {
+ if (closeTheStartingProducer) {
+ /**
+ * Delay retry(wait for the start producer task is finish).
+ * Note: If the producer always start fail, the start
producer task will always retry until the
+ * state changed to {@link State.Terminated}.
+ * Nit: The better solution is creating a {@link
CompletableFuture} to trace the in-progress
+ * creation and call
"inProgressCreationFuture.thenApply(closeProducer())".
+ */
+ long waitTimeMs = backOff.next();
+ brokerService.executor().schedule(() ->
closeProducerAsync(true),
+ waitTimeMs, TimeUnit.MILLISECONDS);
+ } else {
+ log.info("[{}] Skip current producer closing since the
previous producer has been closed,"
+ + " and trying start a new one, state :
{}",
+ replicatorId, setDisconnectingRes.getRight());
+ }
+ } else if (setDisconnectingRes.getRight() == State.Disconnected
+ || setDisconnectingRes.getRight() == State.Disconnecting) {
+ log.info("[{}] Skip current producer closing since other
thread did closing, state : {}",
+ replicatorId, setDisconnectingRes.getRight());
+ } else if (setDisconnectingRes.getRight() == State.Terminating
+ || setDisconnectingRes.getRight() == State.Terminated) {
+ log.info("[{}] Skip current producer closing since other
thread is doing termination, state : {}",
+ replicatorId, state);
+ }
+ log.info("[{}] Skip current termination since other thread is
doing close producer or termination,"
+ + " state : {}", replicatorId, state);
return CompletableFuture.completedFuture(null);
}
- CompletableFuture<Void> future = producer.closeAsync();
+
+ // Close producer and update state.
+ return doCloseProducerAsync(producer, () -> {
+ Pair<Boolean, State> setDisconnectedRes =
compareSetAndGetState(State.Disconnecting, State.Disconnected);
+ if (setDisconnectedRes.getLeft()) {
+ this.producer = null;
+ // deactivate further read
+ disableReplicatorRead();
+ return;
+ }
+ if (setDisconnectedRes.getRight() == State.Terminating
+ || setDisconnectingRes.getRight() == State.Terminated) {
+ log.info("[{}] Skip setting state to terminated because it was
terminated, state : {}",
+ replicatorId, state);
+ } else {
+ // Since only one task can call
"doCloseProducerAsync(producer, action)", this scenario is not expected.
+ // So print a warn log.
+ log.warn("[{}] Other task has change the state to terminated.
so skipped current one task."
+ + " State is : {}",
+ replicatorId, state);
+ }
+ });
+ }
+
+ protected CompletableFuture<Void> doCloseProducerAsync(Producer<byte[]>
producer, Runnable actionAfterClosed) {
+ CompletableFuture<Void> future =
+ producer == null ? CompletableFuture.completedFuture(null) :
producer.closeAsync();
return future.thenRun(() -> {
- STATE_UPDATER.set(this, State.Stopped);
- this.producer = null;
- // deactivate further read
- disableReplicatorRead();
+ actionAfterClosed.run();
}).exceptionally(ex -> {
long waitTimeMs = backOff.next();
log.warn(
- "[{}] Exception: '{}' occurred while trying to close the
producer."
- + " retrying again in {} s",
- replicatorId, ex.getMessage(), waitTimeMs / 1000.0);
+ "[{}] Exception: '{}' occurred while trying to close the
producer. Replicator state: {}."
+ + " Retrying again in {} s.",
+ replicatorId, ex.getMessage(), state, waitTimeMs / 1000.0);
// BackOff before retrying
- brokerService.executor().schedule(this::closeProducerAsync,
waitTimeMs, TimeUnit.MILLISECONDS);
+ brokerService.executor().schedule(() ->
doCloseProducerAsync(producer, actionAfterClosed),
+ waitTimeMs, TimeUnit.MILLISECONDS);
return null;
});
}
+ public CompletableFuture<Void> terminate() {
+ if (!tryChangeStatusToTerminating()) {
Review Comment:
> The method tryChangeStatusToTerminating checks four states, Starting,
Started, Disconnecting, Disconnected, maybe we can only check the states
Terminating and Terminated here.
Since we need `compareAndSet`, so we need four logic branches. And the state
`Disconnecting` and `Disconnected` will be removed in the future, the current
method is not complex.
> Another potential problem is that if the state is Terminating, the method
terminate will return a complete future object, but the terminate operation may
not finish.
The method `terminate` will always return a completed future, because we
watched the exception by `exceptionally` in the method `doCloseProducerAsync`,
and added a mechanism to guarantee the producer can be closed eventually.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]