This is an automated email from the ASF dual-hosted git repository.
daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ba8e8f5e218 [admin][broker] Fix force delete subscription not working
(#22423)
ba8e8f5e218 is described below
commit ba8e8f5e218f01d42f39bc7f62bfc0bcdff99085
Author: 道君 <[email protected]>
AuthorDate: Thu Apr 4 23:08:45 2024 +0800
[admin][broker] Fix force delete subscription not working (#22423)
---
.../broker/admin/impl/PersistentTopicsBase.java | 5 ++--
.../pulsar/broker/admin/PersistentTopicsTest.java | 30 ++++++++++++++++++++++
2 files changed, 32 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c5e280c5577..c9c29271b6a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1557,7 +1557,7 @@ public class PersistentTopicsBase extends AdminResource {
for (int i = 0; i < partitionMetadata.partitions; i++)
{
TopicName topicNamePartition =
topicName.getPartition(i);
futures.add(adminClient.topics()
-
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, false));
+
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, force));
}
return FutureUtil.waitForAll(futures).handle((result,
exception) -> {
@@ -1576,8 +1576,7 @@ public class PersistentTopicsBase extends AdminResource {
return null;
});
}
- return
internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative,
- force);
+ return
internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative,
force);
});
}
});
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 9a292175caa..f37b53bb0dc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -74,10 +74,12 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -1786,4 +1788,32 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
String topicName = "persistent://" + testTenant + "/" +
testNamespaceLocal + "/testCreateMissingPartitions";
assertThrows(PulsarAdminException.NotFoundException.class, () ->
admin.topics().createMissedPartitions(topicName));
}
+
+ @Test
+ public void testForceDeleteSubscription() throws Exception {
+ try {
+ pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
+ String topicName = "persistent://" + testTenant + "/" +
testNamespaceLocal + "/testForceDeleteSubscription";
+ String subName = "sub1";
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, subName,
MessageId.latest);
+
+ @Cleanup
+ Consumer<String> c0 = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+ @Cleanup
+ Consumer<String> c1 = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ admin.topics().deleteSubscription(topicName, subName, true);
+ } finally {
+ pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
+ }
+ }
}