This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d6aa7b44c2ee5972e0236a0596fe662afadb9565 Author: lipenghui <[email protected]> AuthorDate: Fri Jul 30 03:26:52 2021 +0800 Do not create system topic for heartbeat namespace (#11499) (cherry picked from commit 6d8cbc7924f06a399a4f41b8f68b57ee30341137) --- .../broker/service/SystemTopicBasedTopicPoliciesService.java | 10 ++++++++++ .../org/apache/pulsar/broker/admin/TopicPoliciesTest.java | 11 +++++++++++ 2 files changed, 21 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index a76c8af..ab4a521 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; import org.apache.pulsar.broker.systopic.SystemTopicClient; @@ -180,6 +181,11 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { CompletableFuture<Void> result = new CompletableFuture<>(); NamespaceName namespace = namespaceBundle.getNamespaceObject(); + if (NamespaceService.checkHeartbeatNamespace(namespace) != null + || NamespaceService.checkHeartbeatNamespaceV2(namespace) != null) { + result.complete(null); + return result; + } createSystemTopicFactoryIfNeeded(); synchronized (this) { if (readerCaches.get(namespace) != null) { @@ -210,6 +216,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic @Override public CompletableFuture<Void> removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { NamespaceName namespace = namespaceBundle.getNamespaceObject(); + if (NamespaceService.checkHeartbeatNamespace(namespace) != null + || NamespaceService.checkHeartbeatNamespaceV2(namespace) != null) { + return CompletableFuture.completedFuture(null); + } AtomicInteger bundlesCount = ownedBundlesCountPerNamespace.get(namespace); if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) { CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture = diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 25766e8..8b8d944 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.pulsar.broker.ConfigHelper; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BacklogQuotaManager; import org.apache.pulsar.broker.service.PublishRateLimiterImpl; import org.apache.pulsar.broker.service.Topic; @@ -2426,4 +2427,14 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { .isNull()); } + @Test + public void testDoNotCreateSystemTopicForHeartbeatNamespace() { + assertTrue(pulsar.getBrokerService().getTopics().size() > 0); + pulsar.getBrokerService().getTopics().forEach((k, v) -> { + TopicName topicName = TopicName.get(k); + assertNull(NamespaceService.checkHeartbeatNamespace(topicName.getNamespaceObject())); + assertNull(NamespaceService.checkHeartbeatNamespaceV2(topicName.getNamespaceObject())); + }); + } + }
