This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 97d118500ea [fix] [broker] Topics failed to delete after remove
cluster from replicated clusters set and caused OOM (#23360)
97d118500ea is described below
commit 97d118500ea61a9b832c5156c009dfefa61c535d
Author: fengyubiao <[email protected]>
AuthorDate: Mon Oct 14 12:50:02 2024 +0800
[fix] [broker] Topics failed to delete after remove cluster from replicated
clusters set and caused OOM (#23360)
(cherry picked from commit d9bc7af60b2e1afc9a1ca4bd8f3505bcd1c4e06b)
---
.../SystemTopicBasedTopicPoliciesService.java | 11 ++++-
.../NamespaceEventsSystemTopicFactory.java | 8 +++-
.../service/OneWayReplicatorUsingGlobalZKTest.java | 48 ++++++++++++++++++++++
3 files changed, 64 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 8054d781976..746c4dc13ff 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -135,7 +135,16 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
if
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
return CompletableFuture.completedFuture(null);
}
- return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
+ TopicName changeEvents =
NamespaceEventsSystemTopicFactory.getEventsTopicName(topicName.getNamespaceObject());
+ return
pulsarService.getNamespaceService().checkTopicExists(changeEvents).thenCompose(topicExistsInfo
-> {
+ // If the system topic named "__change_events" has been deleted,
it means all the data in the topic have
+ // been deleted, so we do not need to delete the message that we
want to delete again.
+ if (!topicExistsInfo.isExists()) {
+ log.info("Skip delete topic-level policies because {} has been
removed before", changeEvents);
+ return CompletableFuture.completedFuture(null);
+ }
+ return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
+ });
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
index f5e6c7748d1..199026bc4c4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
@@ -37,12 +37,16 @@ public class NamespaceEventsSystemTopicFactory {
}
public TopicPoliciesSystemTopicClient
createTopicPoliciesSystemTopicClient(NamespaceName namespaceName) {
- TopicName topicName = TopicName.get(TopicDomain.persistent.value(),
namespaceName,
- SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+ TopicName topicName = getEventsTopicName(namespaceName);
log.info("Create topic policies system topic client {}",
topicName.toString());
return new TopicPoliciesSystemTopicClient(client, topicName);
}
+ public static TopicName getEventsTopicName(NamespaceName namespaceName) {
+ return TopicName.get(TopicDomain.persistent.value(), namespaceName,
+ SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+ }
+
public <T> TransactionBufferSnapshotBaseSystemTopicClient<T>
createTransactionBufferSystemTopicClient(
TopicName systemTopicName, SystemTopicTxnBufferSnapshotService<T>
systemTopicTxnBufferSnapshotService, Class<T> schemaType) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 34810bbe905..83b9b32ce85 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -19,17 +19,25 @@
package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -167,4 +175,44 @@ public class OneWayReplicatorUsingGlobalZKTest extends
OneWayReplicatorTest {
public void testDifferentTopicCreationRule(ReplicationMode
replicationMode) throws Exception {
super.testDifferentTopicCreationRule(replicationMode);
}
+
+ @Test
+ public void testRemoveCluster() throws Exception {
+ // Initialize.
+ final String ns1 = defaultTenant + "/" +
"ns_73b1a31afce34671a5ddc48fe5ad7fc8";
+ final String topic = "persistent://" + ns1 +
"/___tp-5dd50794-7af8-4a34-8a0b-06188052c66a";
+ final String topicChangeEvents = "persistent://" + ns1 +
"/__change_events";
+ admin1.namespaces().createNamespace(ns1);
+ admin1.namespaces().setNamespaceReplicationClusters(ns1, new
HashSet<>(Arrays.asList(cluster1, cluster2)));
+ admin1.topics().createNonPartitionedTopic(topic);
+
+ // Wait for loading topic up.
+ Producer<String> p =
client1.newProducer(Schema.STRING).topic(topic).create();
+ Awaitility.await().untilAsserted(() -> {
+ ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>
tps
+ = pulsar1.getBrokerService().getTopics();
+ assertTrue(tps.containsKey(topic));
+ assertTrue(tps.containsKey(topicChangeEvents));
+ });
+
+ // The topics under the namespace of the cluster-1 will be deleted.
+ // Verify the result.
+ admin1.namespaces().setNamespaceReplicationClusters(ns1, new
HashSet<>(Arrays.asList(cluster2)));
+ Awaitility.await().atMost(Duration.ofSeconds(120)).untilAsserted(() ->
{
+ ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>
tps
+ = pulsar1.getBrokerService().getTopics();
+ assertFalse(tps.containsKey(topic));
+ assertFalse(tps.containsKey(topicChangeEvents));
+
assertFalse(pulsar1.getNamespaceService().checkTopicExists(TopicName.get(topic)).join().isExists());
+ assertFalse(pulsar1.getNamespaceService()
+
.checkTopicExists(TopicName.get(topicChangeEvents)).join().isExists());
+ });
+
+ Thread.sleep(10 * 60 * 1000);
+
+ // cleanup.
+ p.close();
+ admin2.topics().delete(topic);
+ admin2.namespaces().deleteNamespace(ns1);
+ }
}