This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch delete_ns in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit dd4ce57f3e06584e647c24445346cb77c221a117 Author: Xiangying Meng <[email protected]> AuthorDate: Fri Dec 9 14:56:37 2022 +0800 [fix][broker] Fix delete system topic clean topic policy (#18823) If users set topic policy for system topic, then delete this system topic, the topic policy should be deleted. Only change_events topic do not need to clear topic policies. (cherry picked from commit 93c41de8aac7dd655491d3b231468753d2d0a113) --- .../pulsar/broker/admin/impl/NamespacesBase.java | 55 ++++++++++++++++------ .../broker/service/persistent/PersistentTopic.java | 3 +- .../apache/pulsar/broker/admin/NamespacesTest.java | 37 ++++++++++++++- 3 files changed, 78 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index c2ce36d49ff..d8777f97665 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -73,6 +73,7 @@ import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; @@ -310,10 +311,21 @@ public abstract class NamespacesBase extends AdminResource { // remove from owned namespace map and ephemeral node from ZK final List<CompletableFuture<Void>> futures = Lists.newArrayList(); // remove system topics first. + Set<String> noPartitionedTopicPolicySystemTopic = new HashSet<>(); + Set<String> partitionedTopicPolicySystemTopic = new HashSet<>(); if (!topics.isEmpty()) { for (String topic : topics) { try { - futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true)); + if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) { + TopicName topicName = TopicName.get(topic); + if (topicName.isPartitioned()) { + partitionedTopicPolicySystemTopic.add(topic); + } else { + noPartitionedTopicPolicySystemTopic.add(topic); + } + } else { + futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true)); + } } catch (Exception ex) { log.error("[{}] Failed to delete system topic {}", clientAppId(), topic, ex); asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, ex)); @@ -321,11 +333,14 @@ public abstract class NamespacesBase extends AdminResource { } } } - FutureUtil.waitForAll(futures).thenCompose(__ -> { - List<CompletableFuture<Void>> deleteBundleFutures = Lists.newArrayList(); - NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() - .getBundles(namespaceName); - for (NamespaceBundle bundle : bundles.getBundles()) { + FutureUtil.waitForAll(futures) + .thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic)) + .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic)) + .thenCompose(__ -> { + List<CompletableFuture<Void>> deleteBundleFutures = Lists.newArrayList(); + NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() + .getBundles(namespaceName); + for (NamespaceBundle bundle : bundles.getBundles()) { // check if the bundle is owned by any broker, if not then we do not need to delete the bundle deleteBundleFutures.add(pulsar().getNamespaceService().getOwnerAsync(bundle).thenCompose(ownership -> { if (ownership.isPresent()) { @@ -475,27 +490,41 @@ public abstract class NamespacesBase extends AdminResource { Set<String> nonPartitionedTopics = new HashSet<>(); Set<String> allSystemTopics = new HashSet<>(); Set<String> allPartitionedSystemTopics = new HashSet<>(); + Set<String> noPartitionedTopicPolicySystemTopic = new HashSet<>(); + Set<String> partitionedTopicPolicySystemTopic = new HashSet<>(); for (String topic : topics) { try { TopicName topicName = TopicName.get(topic); if (topicName.isPartitioned()) { if (pulsar().getBrokerService().isSystemTopic(topicName)) { - allPartitionedSystemTopics.add(topicName.getPartitionedTopicName()); + if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) { + partitionedTopicPolicySystemTopic.add(topicName.getPartitionedTopicName()); + } else { + allPartitionedSystemTopics.add(topicName.getPartitionedTopicName()); + } continue; } String partitionedTopic = topicName.getPartitionedTopicName(); if (!partitionedTopics.contains(partitionedTopic)) { + // Distinguish partitioned topic to avoid duplicate deletion of the same schema + topicFutures.add(pulsar().getAdminClient().topics().deletePartitionedTopicAsync( + partitionedTopic, true, true)); partitionedTopics.add(partitionedTopic); } } else { if (pulsar().getBrokerService().isSystemTopic(topicName)) { - allSystemTopics.add(topic); + if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) { + noPartitionedTopicPolicySystemTopic.add(topic); + } else { + allSystemTopics.add(topic); + } continue; } + topicFutures.add(pulsar().getAdminClient().topics().deleteAsync( + topic, true, true)); nonPartitionedTopics.add(topic); } - topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true)); } catch (Exception e) { String errorMessage = String.format("Failed to force delete topic %s, " + "but the previous deletion command of partitioned-topics:%s " @@ -508,11 +537,6 @@ public abstract class NamespacesBase extends AdminResource { } } - for (String partitionedTopic : partitionedTopics) { - topicFutures.add(namespaceResources().getPartitionedTopicResources() - .deletePartitionedTopicAsync(TopicName.get(partitionedTopic))); - } - if (log.isDebugEnabled()) { log.debug("Successfully send deletion command of partitioned-topics:{} " + "and non-partitioned-topics:{} in namespace:{}.", @@ -524,6 +548,9 @@ public abstract class NamespacesBase extends AdminResource { .thenCompose((ignore) -> internalDeleteTopicsAsync(allSystemTopics)) .thenCompose((ignore) -> internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics)) + .thenCompose(ignore -> + internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic)) + .thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic)) .handle((result, exception) -> { if (exception != null) { if (exception.getCause() instanceof PulsarAdminException) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 8e95f2c4313..c4e8ac73be2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1185,8 +1185,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema()) .thenCompose(ignore -> { - if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic) - && brokerService.getPulsar().getConfiguration().isSystemTopicEnabled()) { + if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)) { return deleteTopicPolicies(); } else { return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 16dfe5bc9a3..957088e4717 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin; +import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -57,11 +58,13 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriInfo; +import lombok.Cleanup; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.admin.v1.Namespaces; import org.apache.pulsar.broker.admin.v1.PersistentTopics; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -81,6 +84,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -103,6 +107,8 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl; import org.apache.pulsar.metadata.impl.AbstractMetadataStore; @@ -1938,7 +1944,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { } @Test - public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws Exception { + public void testNotClearTopicPolicesWhenDeleteTopicPolicyTopic() throws Exception { String namespace = this.testTenant + "/delete-systemTopic"; String topic = TopicName.get(TopicDomain.persistent.toString(), this.testTenant, "delete-systemTopic", "testNotClearTopicPolicesWhenDeleteSystemTopic").toString(); @@ -1958,4 +1964,33 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { // 4. delete the policies topic and the topic wil not to clear topic polices admin.topics().delete(namespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true); } + @Test + public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception { + conf.setTopicLevelPoliciesEnabled(true); + conf.setSystemTopicEnabled(true); + Field field = PulsarService.class.getDeclaredField("topicPoliciesService"); + field.setAccessible(true); + field.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar)); + + String systemTopic = SYSTEM_NAMESPACE.toString() + "/" + "testDeleteTopicPolicyWhenDeleteSystemTopic"; + admin.tenants().createTenant(SYSTEM_NAMESPACE.getTenant(), + new TenantInfoImpl(Set.of("role1", "role2"), Set.of("use", "usc", "usw"))); + + admin.namespaces().createNamespace(SYSTEM_NAMESPACE.toString()); + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(systemTopic).create(); + admin.topicPolicies().setMaxConsumers(systemTopic, 5); + + Integer maxConsumerPerTopic = pulsar + .getTopicPoliciesService() + .getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get() + .getMaxConsumerPerTopic(); + + assertEquals(maxConsumerPerTopic, 5); + admin.topics().delete(systemTopic, true); + TopicPolicies topicPolicies = pulsar.getTopicPoliciesService() + .getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get(5, TimeUnit.SECONDS); + assertNull(topicPolicies); + } }
