This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 14d68a494e7 [fix][broker] Fix system service namespace create internal
event topic. (#17867)
14d68a494e7 is described below
commit 14d68a494e7665888d8eb015b0f25839d879f18d
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Oct 13 13:17:16 2022 +0800
[fix][broker] Fix system service namespace create internal event topic.
(#17867)
(cherry picked from commit 29baa0b2e57e9240dca1d76cd1e38bab95f47357)
---
.../pulsar/broker/namespace/NamespaceService.java | 6 ++++
.../pulsar/broker/service/BrokerService.java | 3 +-
.../SystemTopicBasedTopicPoliciesService.java | 4 +++
.../apache/pulsar/broker/admin/AdminApi2Test.java | 2 +-
.../systopic/PartitionedSystemTopicTest.java | 32 +++++++++++++++++++---
5 files changed, 41 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index dbe41b42c8a..9fee517bce9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1393,6 +1393,12 @@ public class NamespaceService implements AutoCloseable {
|| SLA_NAMESPACE_PATTERN.matcher(namespace).matches();
}
+ public static boolean isHeartbeatNamespace(ServiceUnitId ns) {
+ String namespace = ns.getNamespaceObject().toString();
+ return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
+ || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches();
+ }
+
public boolean registerSLANamespace() throws PulsarServerException {
boolean isNameSpaceRegistered =
registerNamespace(getSLAMonitorNamespace(host, config), false);
if (isNameSpaceRegistered) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index db30b18c489..7aad0880bd4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1512,7 +1512,8 @@ public class BrokerService implements Closeable {
RetentionPolicies retentionPolicies = null;
OffloadPoliciesImpl topicLevelOffloadPolicies = null;
- if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
+ if (pulsar.getConfig().isTopicLevelPoliciesEnabled()
+ &&
!NamespaceService.isSystemServiceNamespace(namespace.toString())) {
try {
TopicPolicies topicPolicies =
pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
if (topicPolicies != null) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 9eafa06d47f..eee57482ee0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -101,6 +101,10 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName,
ActionType actionType,
TopicPolicies
policies) {
+ if
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
+ return FutureUtil.failedFuture(
+ new BrokerServiceException.NotAllowedException("Not
allowed to send event to health check topic"));
+ }
CompletableFuture<Void> result = new CompletableFuture<>();
try {
createSystemTopicFactoryIfNeeded();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 2922aa78933..7b6e2176bed 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -524,7 +524,7 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
* @param namespaceName
* @throws Exception
*/
- @Test(dataProvider = "namespaceNames", timeOut = 10000)
+ @Test(dataProvider = "namespaceNames", timeOut = 30000)
public void testResetCursorOnPosition(String namespaceName) throws
Exception {
final String topicName = "persistent://prop-xyz/use/" + namespaceName
+ "/resetPosition";
final int totalProducedMessages = 50;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 9920a097e34..b13919bd9b6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
@@ -172,10 +173,33 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
config.setLedgerOffloader(ledgerOffloader);
Assert.assertEquals(config.getLedgerOffloader(), ledgerOffloader);
- admin.topicPolicies().setMaxConsumers(topicName.toString(), 2);
- Awaitility.await().pollDelay(5, TimeUnit.SECONDS).untilAsserted(() -> {
-
Assert.assertEquals(persistentTopic.getManagedLedger().getConfig().getLedgerOffloader(),
- NullLedgerOffloader.INSTANCE);
+ }
+
+ @Test
+ public void testSystemNamespaceNotCreateChangeEventsTopic() throws
Exception {
+ admin.brokers().healthcheck(TopicVersion.V2);
+ NamespaceName namespaceName =
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+ pulsar.getConfig());
+ TopicName topicName = TopicName.get("persistent", namespaceName,
+ EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+ Optional<Topic> optionalTopic = pulsar.getBrokerService()
+ .getTopic(topicName.getPartition(1).toString(), false).join();
+ Assert.assertFalse(optionalTopic.isPresent());
+ }
+
+ @Test
+ public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception {
+ admin.brokers().healthcheck(TopicVersion.V2);
+ NamespaceName namespaceName =
NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
+ pulsar.getConfig());
+ TopicName topicName = TopicName.get("persistent", namespaceName,
+ EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+ for (int partition = 0; partition < PARTITIONS; partition ++) {
+ pulsar.getBrokerService()
+ .getTopic(topicName.getPartition(partition).toString(),
true).join();
+ }
+ Assert.assertThrows(PulsarAdminException.ConflictException.class, ()
-> {
+ admin.topicPolicies().setMaxConsumers(topicName.toString(), 2);
});
}