This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 14d68a494e7 [fix][broker] Fix system service namespace create internal 
event topic. (#17867)
14d68a494e7 is described below

commit 14d68a494e7665888d8eb015b0f25839d879f18d
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Oct 13 13:17:16 2022 +0800

    [fix][broker] Fix system service namespace create internal event topic. 
(#17867)
    
    (cherry picked from commit 29baa0b2e57e9240dca1d76cd1e38bab95f47357)
---
 .../pulsar/broker/namespace/NamespaceService.java  |  6 ++++
 .../pulsar/broker/service/BrokerService.java       |  3 +-
 .../SystemTopicBasedTopicPoliciesService.java      |  4 +++
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  2 +-
 .../systopic/PartitionedSystemTopicTest.java       | 32 +++++++++++++++++++---
 5 files changed, 41 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index dbe41b42c8a..9fee517bce9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1393,6 +1393,12 @@ public class NamespaceService implements AutoCloseable {
                 || SLA_NAMESPACE_PATTERN.matcher(namespace).matches();
     }
 
+    public static boolean isHeartbeatNamespace(ServiceUnitId ns) {
+        String namespace = ns.getNamespaceObject().toString();
+        return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
+                || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches();
+    }
+
     public boolean registerSLANamespace() throws PulsarServerException {
         boolean isNameSpaceRegistered = 
registerNamespace(getSLAMonitorNamespace(host, config), false);
         if (isNameSpaceRegistered) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index db30b18c489..7aad0880bd4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1512,7 +1512,8 @@ public class BrokerService implements Closeable {
                     RetentionPolicies retentionPolicies = null;
                     OffloadPoliciesImpl topicLevelOffloadPolicies = null;
 
-                    if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
+                    if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
+                            && 
!NamespaceService.isSystemServiceNamespace(namespace.toString())) {
                         try {
                             TopicPolicies topicPolicies = 
pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
                             if (topicPolicies != null) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 9eafa06d47f..eee57482ee0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -101,6 +101,10 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
 
     private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, 
ActionType actionType,
                                                          TopicPolicies 
policies) {
+        if 
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
+            return FutureUtil.failedFuture(
+                    new BrokerServiceException.NotAllowedException("Not 
allowed to send event to health check topic"));
+        }
         CompletableFuture<Void> result = new CompletableFuture<>();
         try {
             createSystemTopicFactoryIfNeeded();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 2922aa78933..7b6e2176bed 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -524,7 +524,7 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
      * @param namespaceName
      * @throws Exception
      */
-    @Test(dataProvider = "namespaceNames", timeOut = 10000)
+    @Test(dataProvider = "namespaceNames", timeOut = 30000)
     public void testResetCursorOnPosition(String namespaceName) throws 
Exception {
         final String topicName = "persistent://prop-xyz/use/" + namespaceName 
+ "/resetPosition";
         final int totalProducedMessages = 50;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 9920a097e34..b13919bd9b6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -172,10 +173,33 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
         LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
         config.setLedgerOffloader(ledgerOffloader);
         Assert.assertEquals(config.getLedgerOffloader(), ledgerOffloader);
-        admin.topicPolicies().setMaxConsumers(topicName.toString(), 2);
-        Awaitility.await().pollDelay(5, TimeUnit.SECONDS).untilAsserted(() -> {
-            
Assert.assertEquals(persistentTopic.getManagedLedger().getConfig().getLedgerOffloader(),
-                    NullLedgerOffloader.INSTANCE);
+    }
+
+    @Test
+    public void testSystemNamespaceNotCreateChangeEventsTopic() throws 
Exception {
+        admin.brokers().healthcheck(TopicVersion.V2);
+        NamespaceName namespaceName = 
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+                pulsar.getConfig());
+        TopicName topicName = TopicName.get("persistent", namespaceName,
+                EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+        Optional<Topic> optionalTopic = pulsar.getBrokerService()
+                .getTopic(topicName.getPartition(1).toString(), false).join();
+        Assert.assertFalse(optionalTopic.isPresent());
+    }
+
+    @Test
+    public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception {
+        admin.brokers().healthcheck(TopicVersion.V2);
+        NamespaceName namespaceName = 
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+                pulsar.getConfig());
+        TopicName topicName = TopicName.get("persistent", namespaceName,
+                EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+        for (int partition = 0; partition < PARTITIONS; partition ++) {
+            pulsar.getBrokerService()
+                    .getTopic(topicName.getPartition(partition).toString(), 
true).join();
+        }
+        Assert.assertThrows(PulsarAdminException.ConflictException.class, () 
-> {
+            admin.topicPolicies().setMaxConsumers(topicName.toString(), 2);
         });
     }
 

Reply via email to