This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6372b9c8f54 [fix] [broker] fix deadlock when disable topic level 
Geo-Replication (#22738)
6372b9c8f54 is described below

commit 6372b9c8f5448757cdb56dc70aaf0ebb52a1063e
Author: fengyubiao <[email protected]>
AuthorDate: Sun May 19 11:28:21 2024 +0800

    [fix] [broker] fix deadlock when disable topic level Geo-Replication 
(#22738)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |   6 +-
 .../broker/service/OneWayReplicatorTest.java       |  59 +++++++
 .../broker/service/OneWayReplicatorTestBase.java   | 172 +++++++++++++++++----
 .../service/OneWayReplicatorUsingGlobalZKTest.java |  97 ++++++++++++
 4 files changed, 306 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 682f41dcdb6..924b7be0855 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3262,12 +3262,14 @@ public class PersistentTopicsBase extends AdminResource 
{
         }
         Set<String> replicationClusters = Sets.newHashSet(clusterIds);
         return validatePoliciesReadOnlyAccessAsync()
-                .thenCompose(__ -> {
+                .thenAccept(__ -> {
                     if (replicationClusters.contains("global")) {
                         throw new RestException(Status.PRECONDITION_FAILED,
                                 "Cannot specify global in the list of 
replication clusters");
                     }
-                    Set<String> clusters = clusters();
+                })
+                .thenCompose(__ -> clustersAsync())
+                .thenCompose(clusters -> {
                     List<CompletableFuture<Void>> futures = new 
ArrayList<>(replicationClusters.size());
                     for (String clusterId : replicationClusters) {
                         if (!clusters.contains(clusterId)) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index fae72e8eac2..a5f1339e95f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -689,4 +689,63 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
             admin2.topics().delete(topicName);
         });
     }
+
+    @Test
+    public void testDeleteNonPartitionedTopic() throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
+        admin1.topics().createNonPartitionedTopic(topicName);
+
+        // Verify replicator works.
+        verifyReplicationWorks(topicName);
+
+        // Disable replication.
+        setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, 
pulsar1);
+        setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, 
pulsar2);
+
+        // Delete topic.
+        admin1.topics().delete(topicName);
+        admin2.topics().delete(topicName);
+
+        // Verify the topic was deleted.
+        assertFalse(pulsar1.getPulsarResources().getTopicResources()
+                .persistentTopicExists(TopicName.get(topicName)).join());
+        assertFalse(pulsar2.getPulsarResources().getTopicResources()
+                .persistentTopicExists(TopicName.get(topicName)).join());
+    }
+
+    @Test
+    public void testDeletePartitionedTopic() throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
+        admin1.topics().createPartitionedTopic(topicName, 2);
+
+        // Verify replicator works.
+        verifyReplicationWorks(topicName);
+
+        // Disable replication.
+        setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, 
pulsar1);
+        setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, 
pulsar2);
+
+        // Delete topic.
+        admin1.topics().deletePartitionedTopic(topicName);
+        if (!usingGlobalZK) {
+            admin2.topics().deletePartitionedTopic(topicName);
+        }
+
+        // Verify the topic was deleted.
+        
assertFalse(pulsar1.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+                .partitionedTopicExists(TopicName.get(topicName)));
+        
assertFalse(pulsar2.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+                .partitionedTopicExists(TopicName.get(topicName)));
+        if (!usingGlobalZK) {
+            // So far, the topic partitions on the remote cluster are needed 
to delete manually when using global ZK.
+            assertFalse(pulsar1.getPulsarResources().getTopicResources()
+                    
.persistentTopicExists(TopicName.get(topicName).getPartition(0)).join());
+            assertFalse(pulsar2.getPulsarResources().getTopicResources()
+                    
.persistentTopicExists(TopicName.get(topicName).getPartition(0)).join());
+            assertFalse(pulsar1.getPulsarResources().getTopicResources()
+                    
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
+            assertFalse(pulsar2.getPulsarResources().getTopicResources()
+                    
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index 317e43306e3..6a84432890c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -21,21 +21,33 @@ package org.apache.pulsar.broker.service;
 import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertEquals;
 import com.google.common.collect.Sets;
 import java.net.URL;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.tests.TestRetrySupport;
@@ -52,6 +64,9 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
     protected final String nonReplicatedNamespace = defaultTenant + "/ns1";
 
     protected final String cluster1 = "r1";
+
+    protected boolean usingGlobalZK = false;
+
     protected URL url1;
     protected URL urlTls1;
     protected ServiceConfiguration config1 = new ServiceConfiguration();
@@ -77,8 +92,12 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
         // Start ZK.
         brokerConfigZk1 = new ZookeeperServerTest(0);
         brokerConfigZk1.start();
-        brokerConfigZk2 = new ZookeeperServerTest(0);
-        brokerConfigZk2.start();
+        if (usingGlobalZK) {
+            brokerConfigZk2 = brokerConfigZk1;
+        } else {
+            brokerConfigZk2 = new ZookeeperServerTest(0);
+            brokerConfigZk2.start();
+        }
 
         // Start BK.
         bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
@@ -132,30 +151,32 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
                 .brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
                 .brokerClientTlsEnabled(false)
                 .build());
-        admin2.clusters().createCluster(cluster1, ClusterData.builder()
-                .serviceUrl(url1.toString())
-                .serviceUrlTls(urlTls1.toString())
-                .brokerServiceUrl(pulsar1.getBrokerServiceUrl())
-                .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
-                .brokerClientTlsEnabled(false)
-                .build());
-        admin2.clusters().createCluster(cluster2, ClusterData.builder()
-                .serviceUrl(url2.toString())
-                .serviceUrlTls(urlTls2.toString())
-                .brokerServiceUrl(pulsar2.getBrokerServiceUrl())
-                .brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
-                .brokerClientTlsEnabled(false)
-                .build());
-
         admin1.tenants().createTenant(defaultTenant, new 
TenantInfoImpl(Collections.emptySet(),
                 Sets.newHashSet(cluster1, cluster2)));
-        admin2.tenants().createTenant(defaultTenant, new 
TenantInfoImpl(Collections.emptySet(),
-                Sets.newHashSet(cluster1, cluster2)));
-
         admin1.namespaces().createNamespace(replicatedNamespace, 
Sets.newHashSet(cluster1, cluster2));
-        admin2.namespaces().createNamespace(replicatedNamespace);
         admin1.namespaces().createNamespace(nonReplicatedNamespace);
-        admin2.namespaces().createNamespace(nonReplicatedNamespace);
+
+        if (!usingGlobalZK) {
+            admin2.clusters().createCluster(cluster1, ClusterData.builder()
+                    .serviceUrl(url1.toString())
+                    .serviceUrlTls(urlTls1.toString())
+                    .brokerServiceUrl(pulsar1.getBrokerServiceUrl())
+                    .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
+                    .brokerClientTlsEnabled(false)
+                    .build());
+            admin2.clusters().createCluster(cluster2, ClusterData.builder()
+                    .serviceUrl(url2.toString())
+                    .serviceUrlTls(urlTls2.toString())
+                    .brokerServiceUrl(pulsar2.getBrokerServiceUrl())
+                    .brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
+                    .brokerClientTlsEnabled(false)
+                    .build());
+            admin2.tenants().createTenant(defaultTenant, new 
TenantInfoImpl(Collections.emptySet(),
+                    Sets.newHashSet(cluster1, cluster2)));
+            admin2.namespaces().createNamespace(replicatedNamespace);
+            admin2.namespaces().createNamespace(nonReplicatedNamespace);
+        }
+
     }
 
     protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws 
Exception {
@@ -163,6 +184,9 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
     }
 
     protected void cleanupTopics(String namespace, CleanupTopicAction 
cleanupTopicAction) throws Exception {
+        if (usingGlobalZK) {
+            throw new IllegalArgumentException("The method cleanupTopics does 
not support for global ZK");
+        }
         waitChangeEventsInit(namespace);
         admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Collections.singleton(cluster1));
         admin1.namespaces().unload(namespace);
@@ -242,11 +266,15 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
         // delete namespaces.
         waitChangeEventsInit(replicatedNamespace);
         
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, 
Sets.newHashSet(cluster1));
+        if (!usingGlobalZK) {
+            
admin2.namespaces().setNamespaceReplicationClusters(replicatedNamespace, 
Sets.newHashSet(cluster2));
+        }
         admin1.namespaces().deleteNamespace(replicatedNamespace);
-        
admin2.namespaces().setNamespaceReplicationClusters(replicatedNamespace, 
Sets.newHashSet(cluster2));
-        admin2.namespaces().deleteNamespace(replicatedNamespace);
         admin1.namespaces().deleteNamespace(nonReplicatedNamespace);
-        admin2.namespaces().deleteNamespace(nonReplicatedNamespace);
+        if (!usingGlobalZK) {
+            admin2.namespaces().deleteNamespace(replicatedNamespace);
+            admin2.namespaces().deleteNamespace(nonReplicatedNamespace);
+        }
 
         // shutdown.
         markCurrentSetupNumberCleaned();
@@ -291,7 +319,7 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
             brokerConfigZk1.stop();
             brokerConfigZk1 = null;
         }
-        if (brokerConfigZk2 != null) {
+        if (!usingGlobalZK && brokerConfigZk2 != null) {
             brokerConfigZk2.stop();
             brokerConfigZk2 = null;
         }
@@ -313,4 +341,96 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
     protected PulsarClient initClient(ClientBuilder clientBuilder) throws 
Exception {
         return clientBuilder.build();
     }
+
+    protected void verifyReplicationWorks(String topic) throws Exception {
+        final String subscription = "__subscribe_1";
+        final String msgValue = "__msg1";
+        Producer<String> producer1 = 
client1.newProducer(Schema.STRING).topic(topic).create();
+        Consumer<String> consumer2 = 
client2.newConsumer(Schema.STRING).topic(topic).isAckReceiptEnabled(true)
+                .subscriptionName(subscription).subscribe();
+        producer1.newMessage().value(msgValue).send();
+        pulsar1.getBrokerService().checkReplicationPolicies();
+        assertEquals(consumer2.receive(10, TimeUnit.SECONDS).getValue(), 
msgValue);
+        consumer2.unsubscribe();
+        producer1.close();
+    }
+
+    protected void setTopicLevelClusters(String topic, List<String> clusters, 
PulsarAdmin admin,
+                                         PulsarService pulsar) throws 
Exception {
+        Set<String> expected = new HashSet<>(clusters);
+        TopicName topicName = 
TopicName.get(TopicName.get(topic).getPartitionedTopicName());
+        int partitions = ensurePartitionsAreSame(topic);
+        admin.topics().setReplicationClusters(topic, clusters);
+        Awaitility.await().untilAsserted(() -> {
+            TopicPolicies policies = 
pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
+            assertEquals(new HashSet<>(policies.getReplicationClusters()), 
expected);
+            if (partitions == 0) {
+                checkNonPartitionedTopicLevelClusters(topicName.toString(), 
clusters, admin, pulsar.getBrokerService());
+            } else {
+                for (int i = 0; i < partitions; i++) {
+                    
checkNonPartitionedTopicLevelClusters(topicName.getPartition(i).toString(), 
clusters, admin,
+                            pulsar.getBrokerService());
+                }
+            }
+        });
+    }
+
+    protected void checkNonPartitionedTopicLevelClusters(String topic, 
List<String> clusters, PulsarAdmin admin,
+                                           BrokerService broker) throws 
Exception {
+        CompletableFuture<Optional<Topic>> future = broker.getTopic(topic, 
false);
+        if (future == null) {
+            return;
+        }
+        Optional<Topic> optional = future.join();
+        if (optional == null || !optional.isPresent()) {
+            return;
+        }
+        PersistentTopic persistentTopic = (PersistentTopic) optional.get();
+        Set<String> expected = new HashSet<>(clusters);
+        Set<String> act = new 
HashSet<>(persistentTopic.getTopicPolicies().get().getReplicationClusters());
+        assertEquals(act, expected);
+    }
+
+    protected int ensurePartitionsAreSame(String topic) throws Exception {
+        TopicName topicName = 
TopicName.get(TopicName.get(topic).getPartitionedTopicName());
+        boolean isPartitionedTopic1 = 
pulsar1.getPulsarResources().getNamespaceResources()
+                
.getPartitionedTopicResources().partitionedTopicExists(topicName);
+        boolean isPartitionedTopic2 = 
pulsar2.getPulsarResources().getNamespaceResources()
+                
.getPartitionedTopicResources().partitionedTopicExists(topicName);
+        if (isPartitionedTopic1 != isPartitionedTopic2) {
+            throw new IllegalArgumentException(String.format("Can not delete 
topic."
+                            + " isPartitionedTopic1: %s, isPartitionedTopic2: 
%s",
+                    isPartitionedTopic1, isPartitionedTopic2));
+        }
+        if (!isPartitionedTopic1) {
+            return 0;
+        }
+        int partitions1 = pulsar1.getPulsarResources().getNamespaceResources()
+                
.getPartitionedTopicResources().getPartitionedTopicMetadataAsync(topicName).join().get().partitions;
+        int partitions2 = pulsar2.getPulsarResources().getNamespaceResources()
+                
.getPartitionedTopicResources().getPartitionedTopicMetadataAsync(topicName).join().get().partitions;
+        if (partitions1 != partitions2) {
+            throw new IllegalArgumentException(String.format("Can not delete 
topic."
+                            + " partitions1: %s, partitions2: %s",
+                    partitions1, partitions2));
+        }
+        return partitions1;
+    }
+
+    protected void deleteTopicAfterDisableTopicLevelReplication(String topic) 
throws Exception {
+        setTopicLevelClusters(topic, Arrays.asList(cluster1), admin1, pulsar1);
+        setTopicLevelClusters(topic, Arrays.asList(cluster1), admin2, pulsar2);
+        admin2.topics().setReplicationClusters(topic, Arrays.asList(cluster2));
+
+        int partitions = ensurePartitionsAreSame(topic);
+
+        TopicName topicName = 
TopicName.get(TopicName.get(topic).getPartitionedTopicName());
+        if (partitions != 0) {
+            admin1.topics().deletePartitionedTopic(topicName.toString());
+            admin2.topics().deletePartitionedTopic(topicName.toString());
+        } else {
+            admin1.topics().delete(topicName.toString());
+            admin2.topics().delete(topicName.toString());
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
new file mode 100644
index 00000000000..d827235bc32
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class OneWayReplicatorUsingGlobalZKTest extends OneWayReplicatorTest {
+
+    @Override
+    @BeforeClass(alwaysRun = true, timeOut = 300000)
+    public void setup() throws Exception {
+        super.usingGlobalZK = true;
+        super.setup();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true, timeOut = 300000)
+    public void cleanup() throws Exception {
+        super.cleanup();
+    }
+
+    @Test(enabled = false)
+    public void testReplicatorProducerStatInTopic() throws Exception {
+        super.testReplicatorProducerStatInTopic();
+    }
+
+    @Test(enabled = false)
+    public void testCreateRemoteConsumerFirst() throws Exception {
+        super.testReplicatorProducerStatInTopic();
+    }
+
+    @Test(enabled = false)
+    public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws 
Exception {
+        super.testReplicatorProducerStatInTopic();
+    }
+
+    @Test(enabled = false)
+    public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws 
Exception {
+        super.testConcurrencyOfUnloadBundleAndRecreateProducer();
+    }
+
+    @Test(enabled = false)
+    public void testPartitionedTopicLevelReplication() throws Exception {
+        super.testPartitionedTopicLevelReplication();
+    }
+
+    @Test(enabled = false)
+    public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws 
Exception {
+        super.testPartitionedTopicLevelReplicationRemoteTopicExist();
+    }
+
+    @Test(enabled = false)
+    public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() 
throws Exception {
+        super.testPartitionedTopicLevelReplicationRemoteConflictTopicExist();
+    }
+
+    @Test(enabled = false)
+    public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws 
Exception {
+        super.testConcurrencyOfUnloadBundleAndRecreateProducer2();
+    }
+
+    @Test(enabled = false)
+    public void testUnFenceTopicToReuse() throws Exception {
+        super.testUnFenceTopicToReuse();
+    }
+
+    @Test
+    public void testDeleteNonPartitionedTopic() throws Exception {
+        super.testDeleteNonPartitionedTopic();
+    }
+
+    @Test
+    public void testDeletePartitionedTopic() throws Exception {
+        super.testDeletePartitionedTopic();
+    }
+}

Reply via email to