This is an automated email from the ASF dual-hosted git repository.

daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ba8e8f5e218 [admin][broker] Fix force delete subscription not working 
(#22423)
ba8e8f5e218 is described below

commit ba8e8f5e218f01d42f39bc7f62bfc0bcdff99085
Author: 道君 <[email protected]>
AuthorDate: Thu Apr 4 23:08:45 2024 +0800

    [admin][broker] Fix force delete subscription not working (#22423)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  5 ++--
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 30 ++++++++++++++++++++++
 2 files changed, 32 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c5e280c5577..c9c29271b6a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1557,7 +1557,7 @@ public class PersistentTopicsBase extends AdminResource {
                         for (int i = 0; i < partitionMetadata.partitions; i++) 
{
                             TopicName topicNamePartition = 
topicName.getPartition(i);
                             futures.add(adminClient.topics()
-                                    
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, false));
+                                    
.deleteSubscriptionAsync(topicNamePartition.toString(), subName, force));
                         }
 
                         return FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
@@ -1576,8 +1576,7 @@ public class PersistentTopicsBase extends AdminResource {
                             return null;
                         });
                     }
-                    return 
internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative,
-                            force);
+                    return 
internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative, 
force);
                 });
             }
         });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 9a292175caa..f37b53bb0dc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -74,10 +74,12 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Topics;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -1786,4 +1788,32 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         String topicName = "persistent://" + testTenant + "/" + 
testNamespaceLocal + "/testCreateMissingPartitions";
         assertThrows(PulsarAdminException.NotFoundException.class, () -> 
admin.topics().createMissedPartitions(topicName));
     }
+
+    @Test
+    public void testForceDeleteSubscription() throws Exception {
+        try {
+            pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
+            String topicName = "persistent://" + testTenant + "/" + 
testNamespaceLocal + "/testForceDeleteSubscription";
+            String subName = "sub1";
+            admin.topics().createNonPartitionedTopic(topicName);
+            admin.topics().createSubscription(topicName, subName, 
MessageId.latest);
+
+            @Cleanup
+            Consumer<String> c0 = pulsarClient.newConsumer(Schema.STRING)
+                    .topic(topicName)
+                    .subscriptionName(subName)
+                    .subscriptionType(SubscriptionType.Shared)
+                    .subscribe();
+            @Cleanup
+            Consumer<String> c1 = pulsarClient.newConsumer(Schema.STRING)
+                    .topic(topicName)
+                    .subscriptionName(subName)
+                    .subscriptionType(SubscriptionType.Shared)
+                    .subscribe();
+
+            admin.topics().deleteSubscription(topicName, subName, true);
+        } finally {
+            pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
+        }
+    }
 }

Reply via email to