This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 5eb0ce2ac5c [fix][broker] Fix delete system topic clean topic policy
(#18823) (#19835)
5eb0ce2ac5c is described below
commit 5eb0ce2ac5c40ee13affebe9fc738e6b91ccc96e
Author: Xiangying Meng <[email protected]>
AuthorDate: Mon Mar 20 23:37:23 2023 +0800
[fix][broker] Fix delete system topic clean topic policy (#18823) (#19835)
Co-authored-by: Jiwe Guo <[email protected]>
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 47 +++++++++++++++++-----
.../broker/service/persistent/PersistentTopic.java | 2 +-
.../apache/pulsar/broker/admin/NamespacesTest.java | 36 ++++++++++++++++-
3 files changed, 74 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index c2ce36d49ff..f76e8a02827 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -73,6 +73,7 @@ import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -310,10 +311,21 @@ public abstract class NamespacesBase extends
AdminResource {
// remove from owned namespace map and ephemeral node from ZK
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
// remove system topics first.
+ Set<String> noPartitionedTopicPolicySystemTopic = new HashSet<>();
+ Set<String> partitionedTopicPolicySystemTopic = new HashSet<>();
if (!topics.isEmpty()) {
for (String topic : topics) {
try {
-
futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+ if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+ TopicName topicName = TopicName.get(topic);
+ if (topicName.isPartitioned()) {
+
partitionedTopicPolicySystemTopic.add(topicName.getPartitionedTopicName());
+ } else {
+ noPartitionedTopicPolicySystemTopic.add(topic);
+ }
+ } else {
+
futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+ }
} catch (Exception ex) {
log.error("[{}] Failed to delete system topic {}",
clientAppId(), topic, ex);
asyncResponse.resume(new
RestException(Status.INTERNAL_SERVER_ERROR, ex));
@@ -321,11 +333,14 @@ public abstract class NamespacesBase extends
AdminResource {
}
}
}
- FutureUtil.waitForAll(futures).thenCompose(__ -> {
- List<CompletableFuture<Void>> deleteBundleFutures =
Lists.newArrayList();
- NamespaceBundles bundles =
pulsar().getNamespaceService().getNamespaceBundleFactory()
- .getBundles(namespaceName);
- for (NamespaceBundle bundle : bundles.getBundles()) {
+ FutureUtil.waitForAll(futures)
+ .thenCompose(ignore ->
internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
+ .thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic))
+ .thenCompose(__ -> {
+ List<CompletableFuture<Void>> deleteBundleFutures =
Lists.newArrayList();
+ NamespaceBundles bundles =
pulsar().getNamespaceService().getNamespaceBundleFactory()
+ .getBundles(namespaceName);
+ for (NamespaceBundle bundle : bundles.getBundles()) {
// check if the bundle is owned by any broker, if not
then we do not need to delete the bundle
deleteBundleFutures.add(pulsar().getNamespaceService().getOwnerAsync(bundle).thenCompose(ownership
-> {
if (ownership.isPresent()) {
@@ -475,13 +490,19 @@ public abstract class NamespacesBase extends
AdminResource {
Set<String> nonPartitionedTopics = new HashSet<>();
Set<String> allSystemTopics = new HashSet<>();
Set<String> allPartitionedSystemTopics = new HashSet<>();
+ Set<String> noPartitionedTopicPolicySystemTopic = new
HashSet<>();
+ Set<String> partitionedTopicPolicySystemTopic = new
HashSet<>();
for (String topic : topics) {
try {
TopicName topicName = TopicName.get(topic);
if (topicName.isPartitioned()) {
if
(pulsar().getBrokerService().isSystemTopic(topicName)) {
-
allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
+ if
(SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+
partitionedTopicPolicySystemTopic.add(topicName.getPartitionedTopicName());
+ } else {
+
allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
+ }
continue;
}
String partitionedTopic =
topicName.getPartitionedTopicName();
@@ -490,12 +511,17 @@ public abstract class NamespacesBase extends
AdminResource {
}
} else {
if
(pulsar().getBrokerService().isSystemTopic(topicName)) {
- allSystemTopics.add(topic);
+ if
(SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+
noPartitionedTopicPolicySystemTopic.add(topic);
+ } else {
+ allSystemTopics.add(topic);
+ }
continue;
}
nonPartitionedTopics.add(topic);
}
-
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true));
+
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
+ topic, true, true));
} catch (Exception e) {
String errorMessage = String.format("Failed to force
delete topic %s, "
+ "but the previous deletion command
of partitioned-topics:%s "
@@ -524,6 +550,9 @@ public abstract class NamespacesBase extends AdminResource {
.thenCompose((ignore) ->
internalDeleteTopicsAsync(allSystemTopics))
.thenCompose((ignore) ->
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+ .thenCompose(ignore ->
+
internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic))
+ .thenCompose(ignore ->
internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
.handle((result, exception) -> {
if (exception != null) {
if (exception.getCause() instanceof
PulsarAdminException) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 8e95f2c4313..d8f56d72960 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1186,7 +1186,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
deleteTopicAuthenticationFuture.thenCompose(ignore ->
deleteSchema())
.thenCompose(ignore -> {
if
(!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
- &&
brokerService.getPulsar().getConfiguration().isSystemTopicEnabled()) {
+ &&
brokerService.getPulsar().getConfiguration().isSystemTopicEnabled()) {
return deleteTopicPolicies();
} else {
return
CompletableFuture.completedFuture(null);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 16dfe5bc9a3..eaa54c1fd34 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin;
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -57,11 +58,13 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
+import lombok.Cleanup;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.v1.Namespaces;
import org.apache.pulsar.broker.admin.v1.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -81,6 +84,7 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -103,6 +107,7 @@ import
org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
@@ -1938,7 +1943,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
}
@Test
- public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws
Exception {
+ public void testNotClearTopicPolicesWhenDeleteTopicPolicyTopic() throws
Exception {
String namespace = this.testTenant + "/delete-systemTopic";
String topic = TopicName.get(TopicDomain.persistent.toString(),
this.testTenant, "delete-systemTopic",
"testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
@@ -1958,4 +1963,33 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
// 4. delete the policies topic and the topic wil not to clear topic
polices
admin.topics().delete(namespace + "/" +
SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
}
+ @Test
+ public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception {
+ conf.setTopicLevelPoliciesEnabled(true);
+ conf.setSystemTopicEnabled(true);
+ Field field =
PulsarService.class.getDeclaredField("topicPoliciesService");
+ field.setAccessible(true);
+ field.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));
+
+ String systemTopic = SYSTEM_NAMESPACE.toString() + "/" +
"testDeleteTopicPolicyWhenDeleteSystemTopic";
+ admin.tenants().createTenant(SYSTEM_NAMESPACE.getTenant(),
+ new TenantInfoImpl(Set.of("role1", "role2"), Set.of("use",
"usc", "usw")));
+
+ admin.namespaces().createNamespace(SYSTEM_NAMESPACE.toString());
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(systemTopic).create();
+ admin.topicPolicies().setMaxConsumers(systemTopic, 5);
+
+ Integer maxConsumerPerTopic = pulsar
+ .getTopicPoliciesService()
+
.getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get()
+ .getMaxConsumerPerTopic();
+
+ assertEquals(maxConsumerPerTopic, Integer.valueOf(5));
+ admin.topics().delete(systemTopic, true);
+ TopicPolicies topicPolicies = pulsar.getTopicPoliciesService()
+
.getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get(5,
TimeUnit.SECONDS);
+ assertNull(topicPolicies);
+ }
}