This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 760b1adf56f5424dedfed4c41805c8874463f29b Author: Jiwei Guo <[email protected]> AuthorDate: Thu Oct 13 13:17:16 2022 +0800 [fix][broker] Fix system service namespace create internal event topic. (#17867) --- .../pulsar/broker/service/BrokerService.java | 3 ++- .../SystemTopicBasedTopicPoliciesService.java | 4 +++ .../apache/pulsar/broker/admin/AdminApi2Test.java | 2 +- .../systopic/PartitionedSystemTopicTest.java | 30 +++++++++++++++++++--- 4 files changed, 33 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index b704044e139..b437f976837 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1507,7 +1507,8 @@ public class BrokerService implements Closeable { RetentionPolicies retentionPolicies = null; OffloadPoliciesImpl topicLevelOffloadPolicies = null; - if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) { + if (pulsar.getConfig().isTopicLevelPoliciesEnabled() + && !NamespaceService.isSystemServiceNamespace(namespace.toString())) { try { TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName); if (topicPolicies != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index f5db6e2311b..26879bff3ce 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -101,6 +101,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType, TopicPolicies policies) { + if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { + return CompletableFuture.failedFuture( + new BrokerServiceException.NotAllowedException("Not allowed to send event to health check topic")); + } CompletableFuture<Void> result = new CompletableFuture<>(); try { createSystemTopicFactoryIfNeeded(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 31b3ee5355a..bb8749e9226 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -524,7 +524,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { * @param namespaceName * @throws Exception */ - @Test(dataProvider = "namespaceNames", timeOut = 10000) + @Test(dataProvider = "namespaceNames", timeOut = 30000) public void testResetCursorOnPosition(String namespaceName) throws Exception { final String topicName = "persistent://prop-xyz/use/" + namespaceName + "/resetPosition"; final int totalProducedMessages = 50; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index d4ed12573f3..d2ae23bb6c9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -30,6 +30,7 @@ import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.ListTopicsOptions; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -172,10 +173,31 @@ public class PartitionedSystemTopicTest extends BrokerTestBase { LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class); config.setLedgerOffloader(ledgerOffloader); Assert.assertEquals(config.getLedgerOffloader(), ledgerOffloader); - admin.topicPolicies().setMaxConsumers(topicName.toString(), 2); - Awaitility.await().pollDelay(5, TimeUnit.SECONDS).untilAsserted(() -> { - Assert.assertEquals(persistentTopic.getManagedLedger().getConfig().getLedgerOffloader(), - NullLedgerOffloader.INSTANCE); + } + + @Test + public void testSystemNamespaceNotCreateChangeEventsTopic() throws Exception { + admin.brokers().healthcheck(TopicVersion.V2); + NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), + pulsar.getConfig()); + TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); + Optional<Topic> optionalTopic = pulsar.getBrokerService() + .getTopic(topicName.getPartition(1).toString(), false).join(); + Assert.assertTrue(optionalTopic.isEmpty()); + } + + @Test + public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception { + admin.brokers().healthcheck(TopicVersion.V2); + NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), + pulsar.getConfig()); + TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); + for (int partition = 0; partition < PARTITIONS; partition ++) { + pulsar.getBrokerService() + .getTopic(topicName.getPartition(partition).toString(), true).join(); + } + Assert.assertThrows(PulsarAdminException.ConflictException.class, () -> { + admin.topicPolicies().setMaxConsumers(topicName.toString(), 2); }); }
