This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 7d097f7250f [fix][broker] Fix heartbeat namespace create event topic 
and cannot delete heartbeat topic (#21360)
7d097f7250f is described below

commit 7d097f7250f4ddd25da984287a1067f64a59deda
Author: ken <[email protected]>
AuthorDate: Thu Oct 19 18:39:41 2023 +0800

    [fix][broker] Fix heartbeat namespace create event topic and cannot delete 
heartbeat topic (#21360)
    
    Co-authored-by: fanjianye <[email protected]>
    Co-authored-by: Jiwei Guo <[email protected]>
---
 .../SystemTopicBasedTopicPoliciesService.java      | 14 +++++++++----
 .../systopic/PartitionedSystemTopicTest.java       | 23 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index ed76d37ae25..da312340954 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -95,20 +95,23 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
 
     @Override
     public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName 
topicName) {
+        if 
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
+            return CompletableFuture.completedFuture(null);
+        }
         return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
     }
 
     @Override
     public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName 
topicName, TopicPolicies policies) {
+        if 
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
+            return CompletableFuture.failedFuture(new 
BrokerServiceException.NotAllowedException(
+                    "Not allowed to update topic policy for the heartbeat 
topic"));
+        }
         return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies);
     }
 
     private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, 
ActionType actionType,
                                                          TopicPolicies 
policies) {
-        if 
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
-            return CompletableFuture.failedFuture(
-                    new BrokerServiceException.NotAllowedException("Not 
allowed to send event to health check topic"));
-        }
         return pulsarService.getPulsarResources().getNamespaceResources()
                 .getPoliciesAsync(topicName.getNamespaceObject())
                 .thenCompose(namespacePolicies -> {
@@ -220,6 +223,9 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     @Override
     public TopicPolicies getTopicPolicies(TopicName topicName,
                                           boolean isGlobal) throws 
TopicPoliciesCacheNotInitException {
+        if 
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
+            return null;
+        }
         if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
             NamespaceName namespace = topicName.getNamespaceObject();
             prepareInitPoliciesCacheAsync(namespace);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 42d941e6168..416d7ed0270 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -191,6 +191,13 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
         Optional<Topic> optionalTopic = pulsar.getBrokerService()
                 .getTopic(topicName.getPartition(1).toString(), false).join();
         Assert.assertTrue(optionalTopic.isEmpty());
+
+        TopicName heartbeatTopicName = TopicName.get("persistent",
+                namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
+        admin.topics().getRetention(heartbeatTopicName.toString());
+        optionalTopic = pulsar.getBrokerService()
+                .getTopic(topicName.getPartition(1).toString(), false).join();
+        Assert.assertTrue(optionalTopic.isEmpty());
     }
 
     @Test
@@ -208,6 +215,22 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
         });
     }
 
+    @Test
+    public void testHeartbeatTopicBeDeleted() throws Exception {
+        admin.brokers().healthcheck(TopicVersion.V2);
+        NamespaceName namespaceName = 
NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
+                pulsar.getConfig());
+        TopicName heartbeatTopicName = TopicName.get("persistent", 
namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
+
+        List<String> topics = 
getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
+        Assert.assertEquals(topics.size(), 1);
+        Assert.assertEquals(topics.get(0), heartbeatTopicName.toString());
+
+        admin.topics().delete(heartbeatTopicName.toString(), true);
+        topics = 
getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
+        Assert.assertEquals(topics.size(), 0);
+    }
+  
     @Test
     public void testHeartbeatNamespaceNotCreateTransactionInternalTopic() 
throws Exception {
         admin.brokers().healthcheck(TopicVersion.V2);

Reply via email to