This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 2290073d5d6 [fix][broker] Fix heartbeat namespace create event topic
and cannot delete heartbeat topic (#21360)
2290073d5d6 is described below
commit 2290073d5d61cfac61d7fdff544992089fd22da5
Author: ken <[email protected]>
AuthorDate: Thu Oct 19 18:39:41 2023 +0800
[fix][broker] Fix heartbeat namespace create event topic and cannot delete
heartbeat topic (#21360)
Co-authored-by: fanjianye <[email protected]>
Co-authored-by: Jiwei Guo <[email protected]>
(cherry picked from commit 700a29d5c877dcde5f3c8c1e946b00a8296b8d4f)
---
.../SystemTopicBasedTopicPoliciesService.java | 14 +++++++++----
.../systopic/PartitionedSystemTopicTest.java | 23 ++++++++++++++++++++++
2 files changed, 33 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 09f8de818db..9651e76a25e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -92,20 +92,23 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
@Override
public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName
topicName) {
+ if
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
+ return CompletableFuture.completedFuture(null);
+ }
return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
}
@Override
public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName
topicName, TopicPolicies policies) {
+ if
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
+ return CompletableFuture.failedFuture(new
BrokerServiceException.NotAllowedException(
+ "Not allowed to update topic policy for the heartbeat
topic"));
+ }
return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies);
}
private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName,
ActionType actionType,
TopicPolicies
policies) {
- if
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
- return CompletableFuture.failedFuture(
- new BrokerServiceException.NotAllowedException("Not
allowed to send event to health check topic"));
- }
return pulsarService.getPulsarResources().getNamespaceResources()
.getPoliciesAsync(topicName.getNamespaceObject())
.thenCompose(namespacePolicies -> {
@@ -217,6 +220,9 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
@Override
public TopicPolicies getTopicPolicies(TopicName topicName,
boolean isGlobal) throws
TopicPoliciesCacheNotInitException {
+ if
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
+ return null;
+ }
if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
NamespaceName namespace = topicName.getNamespaceObject();
prepareInitPoliciesCache(namespace, new CompletableFuture<>());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 42d941e6168..416d7ed0270 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -191,6 +191,13 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
Optional<Topic> optionalTopic = pulsar.getBrokerService()
.getTopic(topicName.getPartition(1).toString(), false).join();
Assert.assertTrue(optionalTopic.isEmpty());
+
+ TopicName heartbeatTopicName = TopicName.get("persistent",
+ namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
+ admin.topics().getRetention(heartbeatTopicName.toString());
+ optionalTopic = pulsar.getBrokerService()
+ .getTopic(topicName.getPartition(1).toString(), false).join();
+ Assert.assertTrue(optionalTopic.isEmpty());
}
@Test
@@ -208,6 +215,22 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
});
}
+ @Test
+ public void testHeartbeatTopicBeDeleted() throws Exception {
+ admin.brokers().healthcheck(TopicVersion.V2);
+ NamespaceName namespaceName =
NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
+ pulsar.getConfig());
+ TopicName heartbeatTopicName = TopicName.get("persistent",
namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
+
+ List<String> topics =
getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
+ Assert.assertEquals(topics.size(), 1);
+ Assert.assertEquals(topics.get(0), heartbeatTopicName.toString());
+
+ admin.topics().delete(heartbeatTopicName.toString(), true);
+ topics =
getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
+ Assert.assertEquals(topics.size(), 0);
+ }
+
@Test
public void testHeartbeatNamespaceNotCreateTransactionInternalTopic()
throws Exception {
admin.brokers().healthcheck(TopicVersion.V2);