This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d6aa7b44c2ee5972e0236a0596fe662afadb9565
Author: lipenghui <[email protected]>
AuthorDate: Fri Jul 30 03:26:52 2021 +0800

    Do not create system topic for heartbeat namespace (#11499)
    
    
    (cherry picked from commit 6d8cbc7924f06a399a4f41b8f68b57ee30341137)
---
 .../broker/service/SystemTopicBasedTopicPoliciesService.java  | 10 ++++++++++
 .../org/apache/pulsar/broker/admin/TopicPoliciesTest.java     | 11 +++++++++++
 2 files changed, 21 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index a76c8af..ab4a521 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
@@ -180,6 +181,11 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     public CompletableFuture<Void> 
addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
         CompletableFuture<Void> result = new CompletableFuture<>();
         NamespaceName namespace = namespaceBundle.getNamespaceObject();
+        if (NamespaceService.checkHeartbeatNamespace(namespace) != null
+                || NamespaceService.checkHeartbeatNamespaceV2(namespace) != 
null) {
+            result.complete(null);
+            return result;
+        }
         createSystemTopicFactoryIfNeeded();
         synchronized (this) {
             if (readerCaches.get(namespace) != null) {
@@ -210,6 +216,10 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     @Override
     public CompletableFuture<Void> 
removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
         NamespaceName namespace = namespaceBundle.getNamespaceObject();
+        if (NamespaceService.checkHeartbeatNamespace(namespace) != null
+                || NamespaceService.checkHeartbeatNamespaceV2(namespace) != 
null) {
+            return CompletableFuture.completedFuture(null);
+        }
         AtomicInteger bundlesCount = 
ownedBundlesCountPerNamespace.get(namespace);
         if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) {
             CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 25766e8..8b8d944 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.pulsar.broker.ConfigHelper;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BacklogQuotaManager;
 import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
 import org.apache.pulsar.broker.service.Topic;
@@ -2426,4 +2427,14 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                         .isNull());
     }
 
+    @Test
+    public void testDoNotCreateSystemTopicForHeartbeatNamespace() {
+        assertTrue(pulsar.getBrokerService().getTopics().size() > 0);
+        pulsar.getBrokerService().getTopics().forEach((k, v) -> {
+            TopicName topicName = TopicName.get(k);
+            
assertNull(NamespaceService.checkHeartbeatNamespace(topicName.getNamespaceObject()));
+            
assertNull(NamespaceService.checkHeartbeatNamespaceV2(topicName.getNamespaceObject()));
+        });
+    }
+
 }

Reply via email to