This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ad23be5c062a5a44d8b8549b7be04fb69b4bfe33
Author: lipenghui <[email protected]>
AuthorDate: Fri Jul 30 03:26:52 2021 +0800

    Do not create system topic for heartbeat namespace (#11499)
    
    (cherry picked from commit 6d8cbc7924f06a399a4f41b8f68b57ee30341137)
---
 .../broker/service/SystemTopicBasedTopicPoliciesService.java | 12 ++++++++++--
 .../org/apache/pulsar/broker/admin/TopicPoliciesTest.java    | 10 ++++++++++
 2 files changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 29d004d..18388f1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -29,12 +29,13 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.events.ActionType;
 import org.apache.pulsar.common.events.EventType;
-import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
-import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.events.TopicPoliciesEvent;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -169,6 +170,10 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     public CompletableFuture<Void> 
addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
         CompletableFuture<Void> result = new CompletableFuture<>();
         NamespaceName namespace = namespaceBundle.getNamespaceObject();
+        if (NamespaceService.checkHeartbeatNamespace(namespace) != null) {
+            result.complete(null);
+            return result;
+        }
         createSystemTopicFactoryIfNeeded();
         synchronized (this) {
             if (readerCaches.get(namespace) != null) {
@@ -198,6 +203,9 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     @Override
     public CompletableFuture<Void> 
removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
         NamespaceName namespace = namespaceBundle.getNamespaceObject();
+        if (NamespaceService.checkHeartbeatNamespace(namespace) != null) {
+            return CompletableFuture.completedFuture(null);
+        }
         AtomicInteger bundlesCount = 
ownedBundlesCountPerNamespace.get(namespace);
         if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) {
             CompletableFuture<SystemTopicClient.Reader> 
readerCompletableFuture = readerCaches.remove(namespace);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 1613911..a1f226f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Sets;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BacklogQuotaManager;
 import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
 import org.apache.pulsar.broker.service.Topic;
@@ -1577,4 +1578,13 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                     Assert.assertTrue(iStats.compactedLedger.ledgerId != 
previousCompactedLedgerId);
                 });
     }
+
+    @Test
+    public void testDoNotCreateSystemTopicForHeartbeatNamespace() {
+        assertTrue(pulsar.getBrokerService().getTopics().size() > 0);
+        pulsar.getBrokerService().getTopics().forEach((k, v) -> {
+            TopicName topicName = TopicName.get(k);
+            
assertNull(NamespaceService.checkHeartbeatNamespace(topicName.getNamespaceObject()));
+        });
+    }
 }

Reply via email to