This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d1fc7323cbf [fix][broker] Ignore and remove the replicator cursor when 
the remote cluster is absent (#19972)
d1fc7323cbf is described below

commit d1fc7323cbf61a6d2955486fc123fdde5253e72c
Author: Yunze Xu <xyzinfern...@163.com>
AuthorDate: Tue Apr 4 10:31:33 2023 +0800

    [fix][broker] Ignore and remove the replicator cursor when the remote 
cluster is absent (#19972)
---
 .../broker/service/persistent/PersistentTopic.java | 30 ++++++++---
 .../service/persistent/PersistentTopicTest.java    | 63 ++++++++++++++++++++++
 2 files changed, 87 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index fa08330ff3c..18a662c4b7a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1696,14 +1696,32 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return future;
     }
 
+    private CompletableFuture<Boolean> checkReplicationCluster(String 
remoteCluster) {
+        return 
brokerService.getPulsar().getPulsarResources().getNamespaceResources()
+                .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
+                .thenApply(optPolicies -> optPolicies.map(policies -> 
policies.replication_clusters)
+                        .orElse(Collections.emptySet()).contains(remoteCluster)
+                        || 
topicPolicies.getReplicationClusters().get().contains(remoteCluster));
+    }
+
     protected CompletableFuture<Void> addReplicationCluster(String 
remoteCluster, ManagedCursor cursor,
             String localCluster) {
         return 
AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(),
 brokerService)
-                .thenCompose(__ -> 
brokerService.pulsar().getPulsarResources().getClusterResources()
-                        .getClusterAsync(remoteCluster)
-                        .thenApply(clusterData ->
-                                
brokerService.getReplicationClient(remoteCluster, clusterData)))
+                .thenCompose(__ -> checkReplicationCluster(remoteCluster))
+                .thenCompose(clusterExists -> {
+                    if (!clusterExists) {
+                        log.warn("Remove the replicator because the cluster 
'{}' does not exist", remoteCluster);
+                        return removeReplicator(remoteCluster).thenApply(__ -> 
null);
+                    }
+                    return 
brokerService.pulsar().getPulsarResources().getClusterResources()
+                            .getClusterAsync(remoteCluster)
+                            .thenApply(clusterData ->
+                                    
brokerService.getReplicationClient(remoteCluster, clusterData));
+                })
                 .thenAccept(replicationClient -> {
+                    if (replicationClient == null) {
+                        return;
+                    }
                     Replicator replicator = 
replicators.computeIfAbsent(remoteCluster, r -> {
                         try {
                             return new 
GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
@@ -1727,8 +1745,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
         String name = PersistentReplicator.getReplicatorName(replicatorPrefix, 
remoteCluster);
 
-        replicators.get(remoteCluster).disconnect().thenRun(() -> {
-
+        
Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::disconnect)
+                .orElse(CompletableFuture.completedFuture(null)).thenRun(() -> 
{
             ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
                 @Override
                 public void deleteCursorComplete(Object ctx) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 80a79e0234d..c63be7aad01 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -40,15 +40,20 @@ import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
@@ -57,6 +62,7 @@ import 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
@@ -66,7 +72,9 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.awaitility.Awaitility;
 import org.junit.Assert;
@@ -525,4 +533,59 @@ public class PersistentTopicTest extends BrokerTestBase {
         makeDeletedFailed.set(false);
         persistentTopic.delete().get();
     }
+
+    @DataProvider(name = "topicLevelPolicy")
+    public static Object[][] topicLevelPolicy() {
+        return new Object[][] { { true }, { false } };
+    }
+
+    @Test(dataProvider = "topicLevelPolicy")
+    public void testCreateTopicWithZombieReplicatorCursor(boolean 
topicLevelPolicy) throws Exception {
+        final String namespace = "prop/ns-abc";
+        final String topicName = "persistent://" + namespace
+                + "/testCreateTopicWithZombieReplicatorCursor" + 
topicLevelPolicy;
+        final String remoteCluster = "remote";
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, 
conf.getReplicatorPrefix() + "." + remoteCluster,
+                MessageId.earliest, true);
+
+        admin.clusters().createCluster(remoteCluster, ClusterData.builder()
+                .serviceUrl("http://localhost:11112";)
+                .brokerServiceUrl("pulsar://localhost:11111")
+                .build());
+        TenantInfo tenantInfo = admin.tenants().getTenantInfo("prop");
+        tenantInfo.getAllowedClusters().add(remoteCluster);
+        admin.tenants().updateTenant("prop", tenantInfo);
+
+        if (topicLevelPolicy) {
+            admin.topics().setReplicationClusters(topicName, 
Collections.singletonList(remoteCluster));
+        } else {
+            admin.namespaces().setNamespaceReplicationClustersAsync(
+                    namespace, Collections.singleton(remoteCluster)).get();
+        }
+
+        final PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false)
+                .get(3, TimeUnit.SECONDS).orElse(null);
+        assertNotNull(topic);
+
+        final Supplier<Set<String>> getCursors = () -> {
+            final Set<String> cursors = new HashSet<>();
+            final Iterable<ManagedCursor> iterable = 
topic.getManagedLedger().getCursors();
+            iterable.forEach(c -> cursors.add(c.getName()));
+            return cursors;
+        };
+        assertEquals(getCursors.get(), 
Collections.singleton(conf.getReplicatorPrefix() + "." + remoteCluster));
+
+        if (topicLevelPolicy) {
+            admin.topics().setReplicationClusters(topicName, 
Collections.emptyList());
+        } else {
+            admin.namespaces().setNamespaceReplicationClustersAsync(namespace, 
Collections.emptySet()).get();
+        }
+        admin.clusters().deleteCluster(remoteCluster);
+        // Now the cluster and its related policy has been removed but the 
replicator cursor still exists
+
+        topic.initialize().get(3, TimeUnit.SECONDS);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS)
+                .until(() -> 
!topic.getManagedLedger().getCursors().iterator().hasNext());
+    }
 }

Reply via email to