This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 736bd5e65ee [fix][broker] Fix unsubscribe non-durable subscription 
error (#21099)
736bd5e65ee is described below

commit 736bd5e65eee77113dac7ee7832275bb8d659933
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Sep 1 18:02:06 2023 +0800

    [fix][broker] Fix unsubscribe non-durable subscription error (#21099)
---
 .../broker/service/persistent/PersistentTopic.java | 11 +++++----
 .../pulsar/broker/service/BrokerServiceTest.java   | 26 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3e3363b8c5b..948c6710447 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1161,15 +1161,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     private void asyncDeleteCursorWithClearDelayedMessage(String 
subscriptionName,
                                                           
CompletableFuture<Void> unsubscribeFuture) {
-        if (!isDelayedDeliveryEnabled()
-                || !(brokerService.getDelayedDeliveryTrackerFactory() 
instanceof BucketDelayedDeliveryTrackerFactory)) {
-            asyncDeleteCursor(subscriptionName, unsubscribeFuture);
-            return;
-        }
-
         PersistentSubscription persistentSubscription = 
subscriptions.get(subscriptionName);
         if (persistentSubscription == null) {
             log.warn("[{}][{}] Can't find subscription, skip clear delayed 
message", topic, subscriptionName);
+            unsubscribeFuture.complete(null);
+            return;
+        }
+        if (!isDelayedDeliveryEnabled()
+                || !(brokerService.getDelayedDeliveryTrackerFactory() 
instanceof BucketDelayedDeliveryTrackerFactory)) {
             asyncDeleteCursor(subscriptionName, unsubscribeFuture);
             return;
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 9012c6edb2f..04018d5fb9d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -91,6 +91,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConnectionPool;
@@ -1623,4 +1624,29 @@ public class BrokerServiceTest extends BrokerTestBase {
         assertEquals(admin.topics().getStats(topicName).getSubscriptions()
                 .get("sub-1").getUnackedMessages(), 0);
     }
+
+    @Test
+    public void testUnsubscribeNonDurableSub() throws Exception {
+        final String ns = "prop/ns-test";
+        final String topic = ns + "/testUnsubscribeNonDurableSub";
+
+        admin.namespaces().createNamespace(ns, 2);
+        admin.topics().createPartitionedTopic(String.format("persistent://%s", 
topic), 1);
+
+        pulsarClient.newProducer(Schema.STRING).topic(topic).create().close();
+        @Cleanup
+        Consumer<String> consumer = pulsarClient
+                .newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionMode(SubscriptionMode.NonDurable)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName("sub1")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+        try {
+            consumer.unsubscribe();
+        } catch (Exception ex) {
+            fail("Unsubscribe failed");
+        }
+    }
 }

Reply via email to