This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 71b96a0579642bd6f8e8da0e59e041ec37304abe Author: Matteo Merli <[email protected]> AuthorDate: Sat May 22 10:13:55 2021 -0700 Revert "Creating a topic does not wait for creating cursor of replicators (#6364)" (#10674) This reverts commit 336e971f4d41d6ffb26b3b53a20f36a360c070e8. (cherry picked from commit e4864921358b89a32b49bc967a6e582fce992988) --- .../pulsar/broker/service/AbstractReplicator.java | 38 ++++------ .../nonpersistent/NonPersistentReplicator.java | 5 -- .../service/persistent/PersistentReplicator.java | 83 +++------------------- .../broker/service/persistent/PersistentTopic.java | 42 +++++++---- 4 files changed, 52 insertions(+), 116 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index a55d21a..00801e5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -122,38 +122,30 @@ public abstract class AbstractReplicator { log.info("[{}][{} -> {}] Replicator already being started. Replicator state: {}", topicName, localCluster, remoteCluster, state); } + return; } log.info("[{}][{} -> {}] Starting replicator", topicName, localCluster, remoteCluster); - openCursorAsync().thenAccept(v -> - producerBuilder.createAsync() - .thenAccept(this::readEntries) - .exceptionally(ex -> { - retryCreateProducer(ex); - return null; - })).exceptionally(ex -> { - retryCreateProducer(ex); + producerBuilder.createAsync().thenAccept(producer -> { + readEntries(producer); + }).exceptionally(ex -> { + if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) { + long waitTimeMs = backOff.next(); + log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName, + localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0); + + // BackOff before retrying + brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); + } else { + log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName, + localCluster, remoteCluster, STATE_UPDATER.get(this), ex); + } return null; }); - } - private void retryCreateProducer(Throwable ex) { - if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) { - long waitTimeMs = backOff.next(); - log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName, - localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0); - - // BackOff before retrying - brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); - } else { - log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName, - localCluster, remoteCluster, STATE_UPDATER.get(this), ex); - } } - protected abstract CompletableFuture<Void> openCursorAsync(); - protected synchronized CompletableFuture<Void> closeProducerAsync() { if (producer == null) { STATE_UPDATER.set(this, State.Stopped); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index 8e4c53a..be9dbff0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -252,11 +252,6 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli } @Override - protected CompletableFuture<Void> openCursorAsync() { - return CompletableFuture.completedFuture(null); - } - - @Override public boolean isConnected() { ProducerImpl<?> producer = this.producer; return producer != null && producer.isConnected(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 8188a61..8c5428b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -34,13 +34,11 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; -import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries; -import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; @@ -48,7 +46,6 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; -import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; @@ -70,9 +67,7 @@ import org.slf4j.LoggerFactory; public class PersistentReplicator extends AbstractReplicator implements Replicator, ReadEntriesCallback, DeleteCallback { private final PersistentTopic topic; - private final String replicatorName; - private final ManagedLedger ledger; - protected volatile ManagedCursor cursor; + protected final ManagedCursor cursor; private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty(); @@ -99,41 +94,19 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat private final Backoff readFailureBackoff = new Backoff(1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS); - private PersistentMessageExpiryMonitor expiryMonitor; + private final PersistentMessageExpiryMonitor expiryMonitor; // for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000; private final ReplicatorStats stats = new ReplicatorStats(); - // Only for test public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster, BrokerService brokerService) throws NamingException { super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService); - this.replicatorName = cursor.getName(); - this.ledger = cursor.getManagedLedger(); - this.cursor = cursor; - this.topic = topic; - this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor, null); - HAVE_PENDING_READ_UPDATER.set(this, FALSE); - PENDING_MESSAGES_UPDATER.set(this, 0); - - readBatchSize = Math.min( - producerQueueSize, - topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize()); - readMaxSizeBytes = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadSizeBytes(); - producerQueueThreshold = (int) (producerQueueSize * 0.9); - - this.initializeDispatchRateLimiterIfNeeded(Optional.empty()); - - startProducer(); - } - - public PersistentReplicator(PersistentTopic topic, String replicatorName, String localCluster, String remoteCluster, - BrokerService brokerService, ManagedLedger ledger) throws NamingException { - super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService); - this.replicatorName = replicatorName; - this.ledger = ledger; this.topic = topic; + this.cursor = cursor; + this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, + Codec.decode(cursor.getName()), cursor, null); HAVE_PENDING_READ_UPDATER.set(this, FALSE); PENDING_MESSAGES_UPDATER.set(this, 0); @@ -192,37 +165,6 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat } } - @Override - protected CompletableFuture<Void> openCursorAsync() { - log.info("[{}][{} -> {}] Starting open cursor for replicator", topicName, localCluster, remoteCluster); - if (cursor != null) { - log.info("[{}][{} -> {}] Using the exists cursor for replicator", topicName, localCluster, remoteCluster); - if (expiryMonitor == null) { - this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor, null); - } - return CompletableFuture.completedFuture(null); - } - CompletableFuture<Void> res = new CompletableFuture<>(); - ledger.asyncOpenCursor(replicatorName, InitialPosition.Earliest, new OpenCursorCallback() { - @Override - public void openCursorComplete(ManagedCursor cursor, Object ctx) { - log.info("[{}][{} -> {}] Open cursor succeed for replicator", topicName, localCluster, remoteCluster); - PersistentReplicator.this.cursor = cursor; - PersistentReplicator.this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor, null); - res.complete(null); - } - - @Override - public void openCursorFailed(ManagedLedgerException exception, Object ctx) { - log.warn("[{}][{} -> {}] Open cursor failed for replicator", topicName, localCluster, remoteCluster, exception); - res.completeExceptionally(new PersistenceException(exception)); - } - - }, null); - return res; - } - - /** * Calculate available permits for read entries. * @@ -671,9 +613,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat msgExpired.calculateRate(); stats.msgRateOut = msgOut.getRate(); stats.msgThroughputOut = msgOut.getValueRate(); - if (expiryMonitor != null) { - stats.msgRateExpired = msgExpired.getRate() + expiryMonitor.getMessageExpiryRate(); - } + stats.msgRateExpired = msgExpired.getRate() + expiryMonitor.getMessageExpiryRate(); } public ReplicatorStats getStats() { @@ -711,17 +651,12 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat // don't do anything for almost caught-up connected subscriptions return false; } - if (expiryMonitor != null) { - return expiryMonitor.expireMessages(messageTTLInSeconds); - } - return false; + + return expiryMonitor.expireMessages(messageTTLInSeconds); } public boolean expireMessages(Position position) { - if (expiryMonitor != null) { - return expiryMonitor.expireMessages(position); - } - return false; + return expiryMonitor.expireMessages(position); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index f4f6e49..bae9a77 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -245,7 +245,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal if (cursor.getName().startsWith(replicatorPrefix)) { String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName()); - boolean isReplicatorStarted = addReplicationCluster(remoteCluster, this, cursor.getName(), localCluster); + boolean isReplicatorStarted = false; + try { + isReplicatorStarted = addReplicationCluster(remoteCluster, cursor, localCluster); + } catch (Exception e) { + log.warn("[{}] failed to start replication", topic, e); + } if (!isReplicatorStarted) { throw new NamingException( PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster); @@ -1272,26 +1277,35 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster); final CompletableFuture<Void> future = new CompletableFuture<>(); - String replicatorName = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster); - String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); - boolean isReplicatorStarted = addReplicationCluster(remoteCluster, PersistentTopic.this, replicatorName, localCluster); - if (isReplicatorStarted) { - future.complete(null); - } else { - future.completeExceptionally(new NamingException( - PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster)); - } + String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster); + ledger.asyncOpenCursor(name, new OpenCursorCallback() { + @Override + public void openCursorComplete(ManagedCursor cursor, Object ctx) { + String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); + boolean isReplicatorStarted = addReplicationCluster(remoteCluster, cursor, localCluster); + if (isReplicatorStarted) { + future.complete(null); + } else { + future.completeExceptionally(new NamingException( + PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster)); + } + } + @Override + public void openCursorFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(new PersistenceException(exception)); + } + + }, null); return future; } - protected boolean addReplicationCluster(String remoteCluster, PersistentTopic persistentTopic, String replicatorName, - String localCluster) { + protected boolean addReplicationCluster(String remoteCluster, ManagedCursor cursor, String localCluster) { AtomicBoolean isReplicatorStarted = new AtomicBoolean(true); replicators.computeIfAbsent(remoteCluster, r -> { try { - return new PersistentReplicator(PersistentTopic.this, replicatorName, localCluster, remoteCluster, - brokerService, ledger); + return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster, + brokerService); } catch (NamingException e) { isReplicatorStarted.set(false); log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
