codelipenghui commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1533581014
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##########
@@ -188,58 +274,134 @@ protected CompletableFuture<Boolean>
isLocalTopicActive() {
}, brokerService.executor());
}
- protected synchronized CompletableFuture<Void> closeProducerAsync() {
- if (producer == null) {
- STATE_UPDATER.set(this, State.Stopped);
+ /**
+ * @Deprecated This method only be used by {@link PersistentTopic#checkGC}
now.
+ * TODO "PersistentReplicator.replicateEntries" may get a
NullPointerException if this method and a
+ * "cursor.readComplete" execute concurrently.
+ */
+ @Deprecated
Review Comment:
I think we can left a TODO there to refactor the code.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##########
@@ -188,58 +274,134 @@ protected CompletableFuture<Boolean>
isLocalTopicActive() {
}, brokerService.executor());
}
- protected synchronized CompletableFuture<Void> closeProducerAsync() {
- if (producer == null) {
- STATE_UPDATER.set(this, State.Stopped);
+ /**
+ * @Deprecated This method only be used by {@link PersistentTopic#checkGC}
now.
+ * TODO "PersistentReplicator.replicateEntries" may get a
NullPointerException if this method and a
+ * "cursor.readComplete" execute concurrently.
+ */
+ @Deprecated
Review Comment:
If it still be used in PersistentTopic#checkGC, I don't think we should
deprecate this method. Do we have any other alternative for
`PersistentTopic#checkGC`?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##########
@@ -64,10 +70,35 @@ public abstract class AbstractReplicator {
protected static final AtomicReferenceFieldUpdater<AbstractReplicator,
State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class,
State.class, "state");
- private volatile State state = State.Stopped;
-
- protected enum State {
- Stopped, Starting, Started, Stopping
+ @VisibleForTesting
+ @Getter
+ protected volatile State state = State.Stopped;
+
+ public enum State {
+ /**
+ * This enum has two mean meanings:Init, Stopped.
+ * Regarding the meaning "Stopped", only {@link
PersistentTopic#checkGC} will call {@link #disconnect},
+ * so this method only be used by {@link PersistentTopic#checkGC}
now.
+ * TODO After improving the method {@link #disconnect)}, we should
rename "Stopped" to "Init".
+ */
+ // The internal producer is stopped.
+ Stopped,
+ // Trying to create a new internal producer.
+ Starting,
+ // The internal producer has started, and tries copy data.
+ Started,
+ /**
+ * @Deprecated Only {@link PersistentTopic#checkGC} will call {@link
#disconnect}, so this method only be
+ * used by {@link PersistentTopic#checkGC} now.
+ * TODO After improving the method {@link #disconnect)}, this enum
should be removed.
+ */
+ @Deprecated
+ // The internal producer is trying to stop.
+ Stopping,
Review Comment:
We still use `Stopping` when disconnecting the replicator. Why do we need
add `@Deprecated` annotation?
BTW, is it better to change `Stopping`, `Stopped` to `Disconnecting`,
`Disconnected`? Honestly, It's not easy to understand what is the difference
between stop, terminate. But disconnecting will be more better in this case.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##########
@@ -188,58 +274,134 @@ protected CompletableFuture<Boolean>
isLocalTopicActive() {
}, brokerService.executor());
}
- protected synchronized CompletableFuture<Void> closeProducerAsync() {
- if (producer == null) {
- STATE_UPDATER.set(this, State.Stopped);
+ /**
+ * @Deprecated This method only be used by {@link PersistentTopic#checkGC}
now.
+ * TODO "PersistentReplicator.replicateEntries" may get a
NullPointerException if this method and a
+ * "cursor.readComplete" execute concurrently.
+ */
+ @Deprecated
+ public CompletableFuture<Void> disconnect(boolean failIfHasBacklog,
boolean closeTheStartingProducer) {
+ long backlog = getNumberOfEntriesInBacklog();
+ if (failIfHasBacklog && backlog > 0) {
+ CompletableFuture<Void> disconnectFuture = new
CompletableFuture<>();
+ disconnectFuture.completeExceptionally(new
TopicBusyException("Cannot close a replicator with backlog"));
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Replicator disconnect failed since topic has
backlog", replicatorId);
+ }
+ return disconnectFuture;
+ }
+ log.info("[{}] Disconnect replicator at position {} with backlog {}",
replicatorId,
+ getReplicatorReadPosition(), backlog);
+ return closeProducerAsync(closeTheStartingProducer);
+ }
+
+ /**
+ * @Deprecated This method only be used by {@link PersistentTopic#checkGC}
now.
+ * TODO "PersistentReplicator.replicateEntries" may get a
NullPointerException if this method and a
+ * "cursor.readComplete" execute concurrently.
+ */
+ @Deprecated
Review Comment:
Same as the above comment.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##########
@@ -101,78 +132,133 @@ public AbstractReplicator(String localCluster, Topic
localTopic, String remoteCl
protected abstract String getProducerName();
- protected abstract void
readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer);
+ protected abstract void
setProducerAndTriggerReadEntries(org.apache.pulsar.client.api.Producer<byte[]>
producer);
protected abstract Position getReplicatorReadPosition();
- protected abstract long getNumberOfEntriesInBacklog();
+ public abstract long getNumberOfEntriesInBacklog();
protected abstract void disableReplicatorRead();
public String getRemoteCluster() {
return remoteCluster;
}
- // This method needs to be synchronized with disconnects else if there is
a disconnect followed by startProducer
- // the end result can be disconnect.
- public synchronized void startProducer() {
- if (STATE_UPDATER.get(this) == State.Stopping) {
- long waitTimeMs = backOff.next();
- if (log.isDebugEnabled()) {
- log.debug(
- "[{}] waiting for producer to close before attempting
to reconnect, retrying in {} s",
- replicatorId, waitTimeMs / 1000.0);
- }
- // BackOff before retrying
-
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer,
waitTimeMs,
- TimeUnit.MILLISECONDS);
- return;
- }
- State state = STATE_UPDATER.get(this);
- if (!STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting))
{
- if (state == State.Started) {
- // Already running
+ public void startProducer() {
+ // Guarantee only one task call "producerBuilder.createAsync()".
+ Pair<Boolean, State> setStartingRes =
compareSetAndGetState(State.Stopped, State.Starting);
+ if (!setStartingRes.getLeft()) {
+ if (setStartingRes.getRight() == State.Starting) {
+ log.info("[{}] Skip the producer creation since other thread
is doing starting, state : {}",
+ replicatorId, state);
+ } else if (setStartingRes.getRight() == State.Started) {
+ // Since the method "startProducer" will be called even if it
is started, only print debug-level log.
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Replicator was already running. state:
{}", replicatorId, state);
+ }
+ } else if (setStartingRes.getRight() == State.Stopping) {
if (log.isDebugEnabled()) {
- log.debug("[{}] Replicator was already running",
replicatorId);
+ log.debug("[{}] Rep.producer is closing, delay to
retry(wait the producer close success)."
+ + " state: {}", replicatorId, state);
}
+ delayStartProducerAfterStopped();
} else {
- log.info("[{}] Replicator already being started. Replicator
state: {}", replicatorId, state);
+ /** {@link State.Terminating}, {@link State.Terminated}. **/
+ log.info("[{}] Skip the producer creation since the replicator
state is : {}", replicatorId, state);
}
-
return;
}
log.info("[{}] Starting replicator", replicatorId);
producerBuilder.createAsync().thenAccept(producer -> {
- readEntries(producer);
+ setProducerAndTriggerReadEntries(producer);
Review Comment:
Actually, this method can't ensure the the replicator can be started
successfully. It just close the producer internally and return. So we will not
get any exception in this case.
Do we need to handle this
case?https://github.com/apache/pulsar/pull/21946/files#diff-16c47c755ed17a62606a6b9a6b1dc65d5fe0baed21f6e3541df264bec81b83f6R168-R171
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##########
@@ -451,7 +472,7 @@ public void readEntriesFailed(ManagedLedgerException
exception, Object ctx) {
}
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
- brokerService.executor().schedule(this::readMoreEntries,
waitTimeMillis, TimeUnit.MILLISECONDS);
+ brokerService.executor().schedule(() -> readMoreEntries(),
waitTimeMillis, TimeUnit.MILLISECONDS);
Review Comment:
We don't need this change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]