This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 71b96a0579642bd6f8e8da0e59e041ec37304abe
Author: Matteo Merli <[email protected]>
AuthorDate: Sat May 22 10:13:55 2021 -0700

    Revert "Creating a topic does not wait for creating cursor of replicators 
(#6364)" (#10674)
    
    This reverts commit 336e971f4d41d6ffb26b3b53a20f36a360c070e8.
    
    (cherry picked from commit e4864921358b89a32b49bc967a6e582fce992988)
---
 .../pulsar/broker/service/AbstractReplicator.java  | 38 ++++------
 .../nonpersistent/NonPersistentReplicator.java     |  5 --
 .../service/persistent/PersistentReplicator.java   | 83 +++-------------------
 .../broker/service/persistent/PersistentTopic.java | 42 +++++++----
 4 files changed, 52 insertions(+), 116 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index a55d21a..00801e5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -122,38 +122,30 @@ public abstract class AbstractReplicator {
                 log.info("[{}][{} -> {}] Replicator already being started. 
Replicator state: {}", topicName,
                         localCluster, remoteCluster, state);
             }
+
             return;
         }
 
         log.info("[{}][{} -> {}] Starting replicator", topicName, 
localCluster, remoteCluster);
-        openCursorAsync().thenAccept(v ->
-            producerBuilder.createAsync()
-                .thenAccept(this::readEntries)
-                .exceptionally(ex -> {
-                    retryCreateProducer(ex);
-                    return null;
-        })).exceptionally(ex -> {
-            retryCreateProducer(ex);
+        producerBuilder.createAsync().thenAccept(producer -> {
+            readEntries(producer);
+        }).exceptionally(ex -> {
+            if (STATE_UPDATER.compareAndSet(this, State.Starting, 
State.Stopped)) {
+                long waitTimeMs = backOff.next();
+                log.warn("[{}][{} -> {}] Failed to create remote producer 
({}), retrying in {} s", topicName,
+                        localCluster, remoteCluster, ex.getMessage(), 
waitTimeMs / 1000.0);
+
+                // BackOff before retrying
+                brokerService.executor().schedule(this::startProducer, 
waitTimeMs, TimeUnit.MILLISECONDS);
+            } else {
+                log.warn("[{}][{} -> {}] Failed to create remote producer. 
Replicator state: {}", topicName,
+                        localCluster, remoteCluster, STATE_UPDATER.get(this), 
ex);
+            }
             return null;
         });
-    }
 
-    private void retryCreateProducer(Throwable ex) {
-        if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
-            long waitTimeMs = backOff.next();
-            log.warn("[{}][{} -> {}] Failed to create remote producer ({}), 
retrying in {} s", topicName,
-                localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 
1000.0);
-
-            // BackOff before retrying
-            brokerService.executor().schedule(this::startProducer, waitTimeMs, 
TimeUnit.MILLISECONDS);
-        } else {
-            log.warn("[{}][{} -> {}] Failed to create remote producer. 
Replicator state: {}", topicName,
-                localCluster, remoteCluster, STATE_UPDATER.get(this), ex);
-        }
     }
 
-    protected abstract CompletableFuture<Void> openCursorAsync();
-
     protected synchronized CompletableFuture<Void> closeProducerAsync() {
         if (producer == null) {
             STATE_UPDATER.set(this, State.Stopped);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index 8e4c53a..be9dbff0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -252,11 +252,6 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
     }
 
     @Override
-    protected CompletableFuture<Void> openCursorAsync() {
-        return CompletableFuture.completedFuture(null);
-    }
-
-    @Override
     public boolean isConnected() {
         ProducerImpl<?> producer = this.producer;
         return producer != null && producer.isConnected();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 8188a61..8c5428b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -34,13 +34,11 @@ import 
java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
-import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
-import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
@@ -48,7 +46,6 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
-import 
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
@@ -70,9 +67,7 @@ import org.slf4j.LoggerFactory;
 public class PersistentReplicator extends AbstractReplicator implements 
Replicator, ReadEntriesCallback, DeleteCallback {
 
     private final PersistentTopic topic;
-    private final String replicatorName;
-    private final ManagedLedger ledger;
-    protected volatile ManagedCursor cursor;
+    protected final ManagedCursor cursor;
 
     private Optional<DispatchRateLimiter> dispatchRateLimiter = 
Optional.empty();
 
@@ -99,41 +94,19 @@ public class PersistentReplicator extends 
AbstractReplicator implements Replicat
 
     private final Backoff readFailureBackoff = new Backoff(1, 
TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
 
-    private PersistentMessageExpiryMonitor expiryMonitor;
+    private final PersistentMessageExpiryMonitor expiryMonitor;
     // for connected subscriptions, message expiry will be checked if the 
backlog is greater than this threshold
     private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
 
     private final ReplicatorStats stats = new ReplicatorStats();
 
-    // Only for test
     public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, 
String localCluster, String remoteCluster,
             BrokerService brokerService) throws NamingException {
         super(topic.getName(), topic.getReplicatorPrefix(), localCluster, 
remoteCluster, brokerService);
-        this.replicatorName = cursor.getName();
-        this.ledger = cursor.getManagedLedger();
-        this.cursor = cursor;
-        this.topic = topic;
-        this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, 
Codec.decode(cursor.getName()), cursor, null);
-        HAVE_PENDING_READ_UPDATER.set(this, FALSE);
-        PENDING_MESSAGES_UPDATER.set(this, 0);
-
-        readBatchSize = Math.min(
-            producerQueueSize,
-            
topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize());
-        readMaxSizeBytes = 
topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadSizeBytes();
-        producerQueueThreshold = (int) (producerQueueSize * 0.9);
-
-        this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
-
-        startProducer();
-    }
-
-    public PersistentReplicator(PersistentTopic topic, String replicatorName, 
String localCluster, String remoteCluster,
-            BrokerService brokerService, ManagedLedger ledger) throws 
NamingException {
-        super(topic.getName(), topic.getReplicatorPrefix(), localCluster, 
remoteCluster, brokerService);
-        this.replicatorName = replicatorName;
-        this.ledger = ledger;
         this.topic = topic;
+        this.cursor = cursor;
+        this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName,
+                Codec.decode(cursor.getName()), cursor, null);
         HAVE_PENDING_READ_UPDATER.set(this, FALSE);
         PENDING_MESSAGES_UPDATER.set(this, 0);
 
@@ -192,37 +165,6 @@ public class PersistentReplicator extends 
AbstractReplicator implements Replicat
         }
     }
 
-    @Override
-    protected CompletableFuture<Void> openCursorAsync() {
-        log.info("[{}][{} -> {}] Starting open cursor for replicator", 
topicName, localCluster, remoteCluster);
-        if (cursor != null) {
-            log.info("[{}][{} -> {}] Using the exists cursor for replicator", 
topicName, localCluster, remoteCluster);
-            if (expiryMonitor == null) {
-                this.expiryMonitor = new 
PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), 
cursor, null);
-            }
-            return CompletableFuture.completedFuture(null);
-        }
-        CompletableFuture<Void> res = new CompletableFuture<>();
-        ledger.asyncOpenCursor(replicatorName, InitialPosition.Earliest, new 
OpenCursorCallback() {
-            @Override
-            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
-                log.info("[{}][{} -> {}] Open cursor succeed for replicator", 
topicName, localCluster, remoteCluster);
-                PersistentReplicator.this.cursor = cursor;
-                PersistentReplicator.this.expiryMonitor = new 
PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), 
cursor, null);
-                res.complete(null);
-            }
-
-            @Override
-            public void openCursorFailed(ManagedLedgerException exception, 
Object ctx) {
-                log.warn("[{}][{} -> {}] Open cursor failed for replicator", 
topicName, localCluster, remoteCluster, exception);
-                res.completeExceptionally(new PersistenceException(exception));
-            }
-
-        }, null);
-        return res;
-    }
-
-
     /**
      * Calculate available permits for read entries.
      *
@@ -671,9 +613,7 @@ public class PersistentReplicator extends 
AbstractReplicator implements Replicat
         msgExpired.calculateRate();
         stats.msgRateOut = msgOut.getRate();
         stats.msgThroughputOut = msgOut.getValueRate();
-        if (expiryMonitor != null) {
-            stats.msgRateExpired = msgExpired.getRate() + 
expiryMonitor.getMessageExpiryRate();
-        }
+        stats.msgRateExpired = msgExpired.getRate() + 
expiryMonitor.getMessageExpiryRate();
     }
 
     public ReplicatorStats getStats() {
@@ -711,17 +651,12 @@ public class PersistentReplicator extends 
AbstractReplicator implements Replicat
             // don't do anything for almost caught-up connected subscriptions
             return false;
         }
-        if (expiryMonitor != null) {
-            return expiryMonitor.expireMessages(messageTTLInSeconds);
-        }
-        return false;
+
+        return expiryMonitor.expireMessages(messageTTLInSeconds);
     }
 
     public boolean expireMessages(Position position) {
-        if (expiryMonitor != null) {
-            return expiryMonitor.expireMessages(position);
-        }
-        return false;
+        return expiryMonitor.expireMessages(position);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f4f6e49..bae9a77 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -245,7 +245,12 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             if (cursor.getName().startsWith(replicatorPrefix)) {
                 String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
                 String remoteCluster = 
PersistentReplicator.getRemoteCluster(cursor.getName());
-                boolean isReplicatorStarted = 
addReplicationCluster(remoteCluster, this, cursor.getName(), localCluster);
+                boolean isReplicatorStarted = false;
+                try {
+                    isReplicatorStarted = addReplicationCluster(remoteCluster, 
cursor, localCluster);
+                } catch (Exception e) {
+                    log.warn("[{}] failed to start replication", topic, e);
+                }
                 if (!isReplicatorStarted) {
                     throw new NamingException(
                         PersistentTopic.this.getName() + " Failed to start 
replicator " + remoteCluster);
@@ -1272,26 +1277,35 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         log.info("[{}] Starting replicator to remote: {}", topic, 
remoteCluster);
         final CompletableFuture<Void> future = new CompletableFuture<>();
 
-        String replicatorName = 
PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
-        String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
-        boolean isReplicatorStarted = addReplicationCluster(remoteCluster, 
PersistentTopic.this, replicatorName, localCluster);
-        if (isReplicatorStarted) {
-            future.complete(null);
-        } else {
-            future.completeExceptionally(new NamingException(
-                PersistentTopic.this.getName() + " Failed to start replicator 
" + remoteCluster));
-        }
+        String name = PersistentReplicator.getReplicatorName(replicatorPrefix, 
remoteCluster);
+        ledger.asyncOpenCursor(name, new OpenCursorCallback() {
+            @Override
+            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
+                String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
+                boolean isReplicatorStarted = 
addReplicationCluster(remoteCluster, cursor, localCluster);
+                if (isReplicatorStarted) {
+                    future.complete(null);
+                } else {
+                    future.completeExceptionally(new NamingException(
+                            PersistentTopic.this.getName() + " Failed to start 
replicator " + remoteCluster));
+                }
+            }
 
+            @Override
+            public void openCursorFailed(ManagedLedgerException exception, 
Object ctx) {
+                future.completeExceptionally(new 
PersistenceException(exception));
+            }
+
+        }, null);
         return future;
     }
 
-    protected boolean addReplicationCluster(String remoteCluster, 
PersistentTopic persistentTopic, String replicatorName,
-            String localCluster) {
+    protected boolean addReplicationCluster(String remoteCluster, 
ManagedCursor cursor, String localCluster) {
         AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
         replicators.computeIfAbsent(remoteCluster, r -> {
             try {
-                return new PersistentReplicator(PersistentTopic.this, 
replicatorName, localCluster, remoteCluster,
-                        brokerService, ledger);
+                return new PersistentReplicator(PersistentTopic.this, cursor, 
localCluster, remoteCluster,
+                        brokerService);
             } catch (NamingException e) {
                 isReplicatorStarted.set(false);
                 log.error("[{}] Replicator startup failed due to 
partitioned-topic {}", topic, remoteCluster);

Reply via email to