gaoran10 commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1565026654
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##########
@@ -134,30 +137,44 @@ public PersistentReplicator(String localCluster,
PersistentTopic localTopic, Man
}
@Override
- protected void readEntries(Producer<byte[]> producer) {
- // Rewind the cursor to be sure to read again all non-acked messages
sent while restarting
+ protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer)
{
+ // Rewind the cursor to be sure to read again all non-acked messages
sent while restarting.
cursor.rewind();
-
cursor.cancelPendingReadRequest();
- HAVE_PENDING_READ_UPDATER.set(this, FALSE);
- this.producer = (ProducerImpl) producer;
- if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) {
- log.info("[{}] Created replicator producer", replicatorId);
+ /**
+ * 1. Try change state to {@link Started}.
+ * 2. Atoms modify multiple properties if change state success, to
avoid another thread get a null value
+ * producer when the state is {@link Started}.
+ */
+ Pair<Boolean, State> changeStateRes;
+ changeStateRes = compareSetAndGetState(Starting, Started);
+ if (changeStateRes.getLeft()) {
+ this.producer = (ProducerImpl) producer;
+ HAVE_PENDING_READ_UPDATER.set(this, FALSE);
+ // Trigger a new read.
+ log.info("[{}] Created replicator producer, Replicator state: {}",
replicatorId, state);
backOff.reset();
- // activate cursor: so, entries can be cached
+ // activate cursor: so, entries can be cached.
this.cursor.setActive();
// read entries
readMoreEntries();
} else {
- log.info(
- "[{}] Replicator was stopped while creating the producer."
- + " Closing it. Replicator state: {}",
- replicatorId, STATE_UPDATER.get(this));
- STATE_UPDATER.set(this, State.Stopping);
- closeProducerAsync();
+ if (changeStateRes.getRight() == Started) {
+ // Since only one task can call
"producerBuilder.createAsync()", this scenario is not expected.
+ // So print a warn log.
+ log.warn("[{}] Replicator was already started by another
thread while creating the producer."
+ + " Closing the producer newly created. Replicator
state: {}", replicatorId, state);
+ } else if (changeStateRes.getRight() == Terminating ||
changeStateRes.getRight() == Terminated) {
+ log.info("[{}] Replicator was terminated, so close the
producer. Replicator state: {}",
Review Comment:
Got it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]