This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 76dfd14f07409a8697cc32482df9b053b1124aaa Author: lipenghui <[email protected]> AuthorDate: Sun Feb 23 19:43:48 2020 +0800 Creating a topic does not wait for creating cursor of replicators (#6364) ### Motivation Creating a topic does not wait for creating cursor of replicators ## Verifying this change The exists unit test can cover this change (cherry picked from commit 336e971f4d41d6ffb26b3b53a20f36a360c070e8) --- .../pulsar/broker/service/AbstractReplicator.java | 38 +++++++----- .../nonpersistent/NonPersistentReplicator.java | 5 ++ .../service/persistent/PersistentReplicator.java | 71 ++++++++++++++++++++-- .../broker/service/persistent/PersistentTopic.java | 37 ++++------- 4 files changed, 108 insertions(+), 43 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index d9b2f8e..13cd091 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -122,30 +122,38 @@ public abstract class AbstractReplicator { log.info("[{}][{} -> {}] Replicator already being started. Replicator state: {}", topicName, localCluster, remoteCluster, state); } - return; } log.info("[{}][{} -> {}] Starting replicator", topicName, localCluster, remoteCluster); - producerBuilder.createAsync().thenAccept(producer -> { - readEntries(producer); - }).exceptionally(ex -> { - if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) { - long waitTimeMs = backOff.next(); - log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName, - localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0); - - // BackOff before retrying - brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); - } else { - log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName, - localCluster, remoteCluster, STATE_UPDATER.get(this), ex); - } + openCursorAsync().thenAccept(v -> + producerBuilder.createAsync() + .thenAccept(this::readEntries) + .exceptionally(ex -> { + retryCreateProducer(ex); + return null; + })).exceptionally(ex -> { + retryCreateProducer(ex); return null; }); + } + private void retryCreateProducer(Throwable ex) { + if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) { + long waitTimeMs = backOff.next(); + log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName, + localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0); + + // BackOff before retrying + brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); + } else { + log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName, + localCluster, remoteCluster, STATE_UPDATER.get(this), ex); + } } + protected abstract CompletableFuture<Void> openCursorAsync(); + protected synchronized CompletableFuture<Void> closeProducerAsync() { if (producer == null) { STATE_UPDATER.set(this, State.Stopped); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index b6ea53a..c109560 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -252,6 +252,11 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli } @Override + protected CompletableFuture<Void> openCursorAsync() { + return CompletableFuture.completedFuture(null); + } + + @Override public boolean isConnected() { ProducerImpl<?> producer = this.producer; return producer != null && producer.isConnected(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 2bc18ca..98c5d5a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service.persistent; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; + import io.netty.buffer.ByteBuf; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; @@ -33,11 +34,13 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries; +import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; @@ -46,6 +49,7 @@ import org.apache.bookkeeper.mledger.util.Rate; import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; +import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; @@ -55,6 +59,7 @@ import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.SendCallback; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.api.proto.PulsarMarkers.MarkerType; import org.apache.pulsar.common.policies.data.ReplicatorStats; @@ -65,7 +70,9 @@ import org.slf4j.LoggerFactory; public class PersistentReplicator extends AbstractReplicator implements Replicator, ReadEntriesCallback, DeleteCallback { private final PersistentTopic topic; - private final ManagedCursor cursor; + private final String replicatorName; + private final ManagedLedger ledger; + protected ManagedCursor cursor; private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty(); @@ -97,11 +104,14 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat private final ReplicatorStats stats = new ReplicatorStats(); + // Only for test public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster, BrokerService brokerService) throws NamingException { super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService); - this.topic = topic; + this.replicatorName = cursor.getName(); + this.ledger = cursor.getManagedLedger(); this.cursor = cursor; + this.topic = topic; this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor); HAVE_PENDING_READ_UPDATER.set(this, FALSE); PENDING_MESSAGES_UPDATER.set(this, 0); @@ -116,6 +126,25 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat startProducer(); } + public PersistentReplicator(PersistentTopic topic, String replicatorName, String localCluster, String remoteCluster, + BrokerService brokerService, ManagedLedger ledger) throws NamingException { + super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService); + this.replicatorName = replicatorName; + this.ledger = ledger; + this.topic = topic; + HAVE_PENDING_READ_UPDATER.set(this, FALSE); + PENDING_MESSAGES_UPDATER.set(this, 0); + + readBatchSize = Math.min( + producerQueueSize, + topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize()); + producerQueueThreshold = (int) (producerQueueSize * 0.9); + + this.initializeDispatchRateLimiterIfNeeded(Optional.empty()); + + startProducer(); + } + @Override protected void readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer) { // Rewind the cursor to be sure to read again all non-acked messages sent while restarting @@ -158,6 +187,36 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat this.cursor.setInactive(); } + @Override + protected synchronized CompletableFuture<Void> openCursorAsync() { + log.info("[{}][{} -> {}] Starting open cursor for replicator", topicName, localCluster, remoteCluster); + if (cursor != null) { + log.info("[{}][{} -> {}] Using the exists cursor for replicator", topicName, localCluster, remoteCluster); + if (expiryMonitor == null) { + this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor); + } + return CompletableFuture.completedFuture(null); + } + CompletableFuture<Void> res = new CompletableFuture<>(); + ledger.asyncOpenCursor(replicatorName, InitialPosition.Earliest, new OpenCursorCallback() { + @Override + public void openCursorComplete(ManagedCursor cursor, Object ctx) { + log.info("[{}][{} -> {}] Open cursor succeed for replicator", topicName, localCluster, remoteCluster); + PersistentReplicator.this.cursor = cursor; + PersistentReplicator.this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor); + res.complete(null); + } + + @Override + public void openCursorFailed(ManagedLedgerException exception, Object ctx) { + log.warn("[{}][{} -> {}] Open cursor failed for replicator", topicName, localCluster, remoteCluster, exception); + res.completeExceptionally(new PersistenceException(exception)); + } + + }, null); + return res; + } + /** * Calculate available permits for read entries. @@ -601,7 +660,9 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat msgExpired.calculateRate(); stats.msgRateOut = msgOut.getRate(); stats.msgThroughputOut = msgOut.getValueRate(); - stats.msgRateExpired = msgExpired.getRate() + expiryMonitor.getMessageExpiryRate(); + if (expiryMonitor != null) { + stats.msgRateExpired = msgExpired.getRate() + expiryMonitor.getMessageExpiryRate(); + } } public ReplicatorStats getStats() { @@ -639,7 +700,9 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat // don't do anything for almost caught-up connected subscriptions return; } - expiryMonitor.expireMessages(messageTTLInSeconds); + if (expiryMonitor != null) { + expiryMonitor.expireMessages(messageTTLInSeconds); + } } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 34cad87..7eb4dd9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -217,7 +217,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal if (cursor.getName().startsWith(replicatorPrefix)) { String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName()); - boolean isReplicatorStarted = addReplicationCluster(remoteCluster, this, cursor, localCluster); + boolean isReplicatorStarted = addReplicationCluster(remoteCluster, this, cursor.getName(), localCluster); if (!isReplicatorStarted) { throw new NamingException( PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster); @@ -1156,37 +1156,26 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster); final CompletableFuture<Void> future = new CompletableFuture<>(); - String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster); - ledger.asyncOpenCursor(name, new OpenCursorCallback() { - @Override - public void openCursorComplete(ManagedCursor cursor, Object ctx) { - String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); - boolean isReplicatorStarted = addReplicationCluster(remoteCluster, PersistentTopic.this, cursor, localCluster); - if (isReplicatorStarted) { - future.complete(null); - } else { - future.completeExceptionally(new NamingException( - PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster)); - } - } - - @Override - public void openCursorFailed(ManagedLedgerException exception, Object ctx) { - future.completeExceptionally(new PersistenceException(exception)); - } - - }, null); + String replicatorName = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster); + String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); + boolean isReplicatorStarted = addReplicationCluster(remoteCluster, PersistentTopic.this, replicatorName, localCluster); + if (isReplicatorStarted) { + future.complete(null); + } else { + future.completeExceptionally(new NamingException( + PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster)); + } return future; } - protected boolean addReplicationCluster(String remoteCluster, PersistentTopic persistentTopic, ManagedCursor cursor, + protected boolean addReplicationCluster(String remoteCluster, PersistentTopic persistentTopic, String replicatorName, String localCluster) { AtomicBoolean isReplicatorStarted = new AtomicBoolean(true); replicators.computeIfAbsent(remoteCluster, r -> { try { - return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster, - brokerService); + return new PersistentReplicator(PersistentTopic.this, replicatorName, localCluster, remoteCluster, + brokerService, ledger); } catch (NamingException e) { isReplicatorStarted.set(false); log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
