This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ad23be5c062a5a44d8b8549b7be04fb69b4bfe33 Author: lipenghui <[email protected]> AuthorDate: Fri Jul 30 03:26:52 2021 +0800 Do not create system topic for heartbeat namespace (#11499) (cherry picked from commit 6d8cbc7924f06a399a4f41b8f68b57ee30341137) --- .../broker/service/SystemTopicBasedTopicPoliciesService.java | 12 ++++++++++-- .../org/apache/pulsar/broker/admin/TopicPoliciesTest.java | 10 ++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 29d004d..18388f1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -29,12 +29,13 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; +import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.events.ActionType; import org.apache.pulsar.common.events.EventType; -import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; -import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.common.events.PulsarEvent; import org.apache.pulsar.common.events.TopicPoliciesEvent; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -169,6 +170,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { CompletableFuture<Void> result = new CompletableFuture<>(); NamespaceName namespace = namespaceBundle.getNamespaceObject(); + if (NamespaceService.checkHeartbeatNamespace(namespace) != null) { + result.complete(null); + return result; + } createSystemTopicFactoryIfNeeded(); synchronized (this) { if (readerCaches.get(namespace) != null) { @@ -198,6 +203,9 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic @Override public CompletableFuture<Void> removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { NamespaceName namespace = namespaceBundle.getNamespaceObject(); + if (NamespaceService.checkHeartbeatNamespace(namespace) != null) { + return CompletableFuture.completedFuture(null); + } AtomicInteger bundlesCount = ownedBundlesCountPerNamespace.get(namespace); if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) { CompletableFuture<SystemTopicClient.Reader> readerCompletableFuture = readerCaches.remove(namespace); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 1613911..a1f226f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BacklogQuotaManager; import org.apache.pulsar.broker.service.PublishRateLimiterImpl; import org.apache.pulsar.broker.service.Topic; @@ -1577,4 +1578,13 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { Assert.assertTrue(iStats.compactedLedger.ledgerId != previousCompactedLedgerId); }); } + + @Test + public void testDoNotCreateSystemTopicForHeartbeatNamespace() { + assertTrue(pulsar.getBrokerService().getTopics().size() > 0); + pulsar.getBrokerService().getTopics().forEach((k, v) -> { + TopicName topicName = TopicName.get(k); + assertNull(NamespaceService.checkHeartbeatNamespace(topicName.getNamespaceObject())); + }); + } }
