This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new da70f9ed173 [fix] [broker] Topics failed to delete after remove 
cluster from replicated clusters set and caused OOM (#23360)
da70f9ed173 is described below

commit da70f9ed173634ef55e07aeef13e07cd6571ce89
Author: fengyubiao <[email protected]>
AuthorDate: Mon Oct 14 12:50:02 2024 +0800

    [fix] [broker] Topics failed to delete after remove cluster from replicated 
clusters set and caused OOM (#23360)
    
    (cherry picked from commit d9bc7af60b2e1afc9a1ca4bd8f3505bcd1c4e06b)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 11 ++++-
 .../NamespaceEventsSystemTopicFactory.java         |  8 +++-
 .../service/OneWayReplicatorUsingGlobalZKTest.java | 48 ++++++++++++++++++++++
 3 files changed, 64 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index e344d892b31..80832882413 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -127,7 +127,16 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         if 
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
             return CompletableFuture.completedFuture(null);
         }
-        return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
+        TopicName changeEvents = 
NamespaceEventsSystemTopicFactory.getEventsTopicName(topicName.getNamespaceObject());
+        return 
pulsarService.getNamespaceService().checkTopicExists(changeEvents).thenCompose(exists
 -> {
+            // If the system topic named "__change_events" has been deleted, 
it means all the data in the topic have
+            // been deleted, so we do not need to delete the message that we 
want to delete again.
+            if (!exists) {
+                log.info("Skip delete topic-level policies because {} has been 
removed before", changeEvents);
+                return CompletableFuture.completedFuture(null);
+            }
+            return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
+        });
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
index f5e6c7748d1..199026bc4c4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
@@ -37,12 +37,16 @@ public class NamespaceEventsSystemTopicFactory {
     }
 
     public TopicPoliciesSystemTopicClient 
createTopicPoliciesSystemTopicClient(NamespaceName namespaceName) {
-        TopicName topicName = TopicName.get(TopicDomain.persistent.value(), 
namespaceName,
-                SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+        TopicName topicName = getEventsTopicName(namespaceName);
         log.info("Create topic policies system topic client {}", 
topicName.toString());
         return new TopicPoliciesSystemTopicClient(client, topicName);
     }
 
+    public static TopicName getEventsTopicName(NamespaceName namespaceName) {
+        return TopicName.get(TopicDomain.persistent.value(), namespaceName,
+                SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+    }
+
     public <T> TransactionBufferSnapshotBaseSystemTopicClient<T> 
createTransactionBufferSystemTopicClient(
             TopicName systemTopicName, SystemTopicTxnBufferSnapshotService<T>
             systemTopicTxnBufferSnapshotService, Class<T> schemaType) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index b8f8edce247..37cb02c301a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -19,6 +19,18 @@
 package org.apache.pulsar.broker.service;
 
 import lombok.extern.slf4j.Slf4j;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -109,4 +121,40 @@ public class OneWayReplicatorUsingGlobalZKTest extends 
OneWayReplicatorTest {
     public void testReloadWithTopicLevelGeoReplication(ReplicationLevel 
replicationLevel) throws Exception {
         super.testReloadWithTopicLevelGeoReplication(replicationLevel);
     }
+
+    @Test
+    public void testRemoveCluster() throws Exception {
+        // Initialize.
+        final String ns1 = defaultTenant + "/" + 
"ns_73b1a31afce34671a5ddc48fe5ad7fc8";
+        final String topic = "persistent://" + ns1 + 
"/___tp-5dd50794-7af8-4a34-8a0b-06188052c66a";
+        final String topicChangeEvents = "persistent://" + ns1 + 
"/__change_events";
+        admin1.namespaces().createNamespace(ns1);
+        admin1.namespaces().setNamespaceReplicationClusters(ns1, new 
HashSet<>(Arrays.asList(cluster1, cluster2)));
+        admin1.topics().createNonPartitionedTopic(topic);
+
+        // Wait for loading topic up.
+        Producer<String> p = 
client1.newProducer(Schema.STRING).topic(topic).create();
+        Awaitility.await().untilAsserted(() -> {
+            ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> 
tps = pulsar1.getBrokerService().getTopics();
+            assertTrue(tps.containsKey(topic));
+            assertTrue(tps.containsKey(topicChangeEvents));
+        });
+
+        // The topics under the namespace of the cluster-1 will be deleted.
+        // Verify the result.
+        admin1.namespaces().setNamespaceReplicationClusters(ns1, new 
HashSet<>(Arrays.asList(cluster2)));
+        Awaitility.await().atMost(Duration.ofSeconds(120)).untilAsserted(() -> 
{
+            ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> 
tps = pulsar1.getBrokerService().getTopics();
+            assertFalse(tps.containsKey(topic));
+            assertFalse(tps.containsKey(topicChangeEvents));
+            
assertFalse(pulsar1.getNamespaceService().checkTopicExists(TopicName.get(topic)).join());
+            assertFalse(pulsar1.getNamespaceService()
+                    
.checkTopicExists(TopicName.get(topicChangeEvents)).join());
+        });
+
+        // cleanup.
+        p.close();
+        admin2.topics().delete(topic);
+        admin2.namespaces().deleteNamespace(ns1);
+    }
 }

Reply via email to