This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new f10df17f27e [fix] [broker] topics infinitely failed to delete after 
remove cluster from replicated clusters modifying when using partitioned system 
topic (#24097)
f10df17f27e is described below

commit f10df17f27e3cf44beefe239cfd166ded61d4d8c
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Sat Mar 29 00:40:26 2025 +0800

    [fix] [broker] topics infinitely failed to delete after remove cluster from 
replicated clusters modifying when using partitioned system topic (#24097)
    
    (cherry picked from commit abd51212839cec931a22b5ee293802a0b78175ec)
---
 .../SystemTopicBasedTopicPoliciesService.java      |  21 ++-
 ...OneWayReplicatorUsingGlobalPartitionedTest.java | 207 +++++++++++++++++++++
 2 files changed, 226 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index f2206f5cada..7bf6119de34 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -137,10 +137,27 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
             return CompletableFuture.completedFuture(null);
         }
         TopicName changeEvents = 
NamespaceEventsSystemTopicFactory.getEventsTopicName(topicName.getNamespaceObject());
-        return 
pulsarService.getNamespaceService().checkTopicExists(changeEvents).thenCompose(topicExistsInfo
 -> {
+        CompletableFuture<Boolean> changeEventTopicExists = 
pulsarService.getPulsarResources().getTopicResources()
+                
.persistentTopicExists(changeEvents).thenCompose(nonPartitionedExists -> {
+            if (!nonPartitionedExists) {
+                // To check whether partitioned __change_events exists.
+                // Instead of checking partitioned metadata, we check the 
first partition, because there is a case
+                // does not work if we choose checking partitioned metadata.
+                // The case's details:
+                // 1. Start 2 clusters: c1 and c2.
+                // 2. Enable replication between c1 and c2 with a global ZK.
+                // 3. The partitioned metadata was shared using by c1 and c2.
+                // 4. Pulsar only delete partitions when the topic is deleting 
from c1, because c2 is still using
+                //    partitioned metadata.
+                return pulsarService.getPulsarResources().getTopicResources()
+                        .persistentTopicExists(changeEvents.getPartition(0));
+            }
+            return CompletableFuture.completedFuture(true);
+        });
+        return changeEventTopicExists.thenCompose(exists -> {
             // If the system topic named "__change_events" has been deleted, 
it means all the data in the topic have
             // been deleted, so we do not need to delete the message that we 
want to delete again.
-            if (!topicExistsInfo.isExists()) {
+            if (!exists) {
                 log.info("Skip delete topic-level policies because {} has been 
removed before", changeEvents);
                 return CompletableFuture.completedFuture(null);
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
new file mode 100644
index 00000000000..2a2a1befd16
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class OneWayReplicatorUsingGlobalPartitionedTest extends 
OneWayReplicatorTest {
+
+    @Override
+    @BeforeClass(alwaysRun = true, timeOut = 300000)
+    public void setup() throws Exception {
+        super.usingGlobalZK = true;
+        super.setup();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true, timeOut = 300000)
+    public void cleanup() throws Exception {
+        super.cleanup();
+    }
+
+    @Override
+    protected void setConfigDefaults(ServiceConfiguration config, String 
clusterName,
+                                     LocalBookkeeperEnsemble 
bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
+        super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, 
brokerConfigZk);
+        config.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
+        config.setDefaultNumPartitions(1);
+    }
+
+    @Override
+    @Test(enabled = false)
+    public void testReplicatorProducerStatInTopic() throws Exception {
+        super.testReplicatorProducerStatInTopic();
+    }
+
+    @Override
+    @Test(enabled = false)
+    public void testCreateRemoteConsumerFirst() throws Exception {
+        super.testReplicatorProducerStatInTopic();
+    }
+
+    @Override
+    @Test(enabled = false)
+    public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws 
Exception {
+        super.testReplicatorProducerStatInTopic();
+    }
+
+    @Override
+    @Test(enabled = false)
+    public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws 
Exception {
+        super.testConcurrencyOfUnloadBundleAndRecreateProducer();
+    }
+
+    @Override
+    @Test(enabled = false)
+    public void testPartitionedTopicLevelReplication() throws Exception {
+        super.testPartitionedTopicLevelReplication();
+    }
+
+    @Override
+    @Test(enabled = false)
+    public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws 
Exception {
+        super.testPartitionedTopicLevelReplicationRemoteTopicExist();
+    }
+
+    @Override
+    @Test(enabled = false)
+    public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() 
throws Exception {
+        super.testPartitionedTopicLevelReplicationRemoteConflictTopicExist();
+    }
+
+    @Override
+    @Test(enabled = false)
+    public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws 
Exception {
+        super.testConcurrencyOfUnloadBundleAndRecreateProducer2();
+    }
+
+    @Override
+    @Test(enabled = false)
+    public void testUnFenceTopicToReuse() throws Exception {
+        super.testUnFenceTopicToReuse();
+    }
+
+    @Override
+    @Test(enabled = false)
+    public void testDeleteNonPartitionedTopic() throws Exception {
+        super.testDeleteNonPartitionedTopic();
+    }
+
+    @Override
+    @Test(enabled = false)
+    public void testDeletePartitionedTopic() throws Exception {
+        super.testDeletePartitionedTopic();
+    }
+
+    @Override
+    @Test(enabled = false)
+    public void testNoExpandTopicPartitionsWhenDisableTopicLevelReplication() 
throws Exception {
+        super.testNoExpandTopicPartitionsWhenDisableTopicLevelReplication();
+    }
+
+    @Override
+    @Test(enabled = false)
+    public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws 
Exception {
+        super.testExpandTopicPartitionsOnNamespaceLevelReplication();
+    }
+
+    @Override
+    @Test(enabled = false)
+    public void testReloadWithTopicLevelGeoReplication(ReplicationLevel 
replicationLevel) throws Exception {
+        super.testReloadWithTopicLevelGeoReplication(replicationLevel);
+    }
+
+    @Test(enabled = false)
+    @Override
+    public void testConfigReplicationStartAt() throws Exception {
+        super.testConfigReplicationStartAt();
+    }
+
+    @Test(enabled = false)
+    @Override
+    public void testDifferentTopicCreationRule(ReplicationMode 
replicationMode) throws Exception {
+        super.testDifferentTopicCreationRule(replicationMode);
+    }
+
+    @Test(enabled = false)
+    @Override
+    public void testReplicationCountMetrics() throws Exception {
+        super.testReplicationCountMetrics();
+    }
+
+    @Test(timeOut = 60_000)
+    public void testRemoveCluster() throws Exception {
+        // Initialize.
+        final String ns1 = defaultTenant + "/" + 
"ns_73b1a31afce34671a5ddc48fe5ad7fc8";
+        final String topic = "persistent://" + ns1 + 
"/___tp-5dd50794-7af8-4a34-8a0b-06188052c66a";
+        final String topicChangeEvents = "persistent://" + ns1 + 
"/__change_events-partition-0";
+        admin1.namespaces().createNamespace(ns1);
+        admin1.namespaces().setNamespaceReplicationClusters(ns1, new 
HashSet<>(Arrays.asList(cluster1, cluster2)));
+        admin1.topics().createNonPartitionedTopic(topic);
+
+        // Wait for loading topic up.
+        Producer<String> p = 
client1.newProducer(Schema.STRING).topic(topic).create();
+        Awaitility.await().untilAsserted(() -> {
+            Map<String, CompletableFuture<Optional<Topic>>> tps = 
pulsar1.getBrokerService().getTopics();
+            assertTrue(tps.containsKey(topic));
+            assertTrue(tps.containsKey(topicChangeEvents));
+        });
+
+        // The topics under the namespace of the cluster-1 will be deleted.
+        // Verify the result.
+        admin1.namespaces().setNamespaceReplicationClusters(ns1, new 
HashSet<>(Arrays.asList(cluster2)));
+        
Awaitility.await().atMost(Duration.ofSeconds(60)).ignoreExceptions().untilAsserted(()
 -> {
+            Map<String, CompletableFuture<Optional<Topic>>> tps = 
pulsar1.getBrokerService().getTopics();
+            assertFalse(tps.containsKey(topic));
+            assertFalse(tps.containsKey(topicChangeEvents));
+            
assertFalse(pulsar1.getNamespaceService().checkTopicExists(TopicName.get(topic))
+                    .get(5, TimeUnit.SECONDS).isExists());
+            assertFalse(pulsar1.getNamespaceService()
+                    .checkTopicExists(TopicName.get(topicChangeEvents))
+                    .get(5, TimeUnit.SECONDS).isExists());
+        });
+
+        // cleanup.
+        p.close();
+        admin2.topics().delete(topic);
+        admin2.namespaces().deleteNamespace(ns1);
+    }
+}

Reply via email to