This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new ed58028e0bd [admin][broker] Fix force delete subscription not working (#22423) ed58028e0bd is described below commit ed58028e0bd0939b616d29ea31558fedcbc563d7 Author: 道君 <dao...@apache.org> AuthorDate: Thu Apr 4 23:08:45 2024 +0800 [admin][broker] Fix force delete subscription not working (#22423) --- .../broker/admin/impl/PersistentTopicsBase.java | 5 ++-- .../pulsar/broker/admin/PersistentTopicsTest.java | 30 ++++++++++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 3eddddf8773..318e2bc2cde 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1566,7 +1566,7 @@ public class PersistentTopicsBase extends AdminResource { for (int i = 0; i < partitionMetadata.partitions; i++) { TopicName topicNamePartition = topicName.getPartition(i); futures.add(adminClient.topics() - .deleteSubscriptionAsync(topicNamePartition.toString(), subName, false)); + .deleteSubscriptionAsync(topicNamePartition.toString(), subName, force)); } return FutureUtil.waitForAll(futures).handle((result, exception) -> { @@ -1585,8 +1585,7 @@ public class PersistentTopicsBase extends AdminResource { return null; }); } - return internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative, - force); + return internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative, force); }); } }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 6641a5805c0..8e1375303ce 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -79,11 +79,13 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -1793,6 +1795,34 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { assertThrows(PulsarAdminException.NotFoundException.class, () -> admin.topics().createMissedPartitions(topicName)); } + @Test + public void testForceDeleteSubscription() throws Exception { + try { + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false); + String topicName = "persistent://" + testTenant + "/" + testNamespaceLocal + "/testForceDeleteSubscription"; + String subName = "sub1"; + admin.topics().createNonPartitionedTopic(topicName); + admin.topics().createSubscription(topicName, subName, MessageId.latest); + + @Cleanup + Consumer<String> c0 = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + @Cleanup + Consumer<String> c1 = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + + admin.topics().deleteSubscription(topicName, subName, true); + } finally { + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true); + } + } + @Test public void testUpdatePropertiesOnNonDurableSub() throws Exception { String topic = "persistent://" + testTenant + "/" + testNamespaceLocal + "/testUpdatePropertiesOnNonDurableSub";