This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 736bd5e65ee [fix][broker] Fix unsubscribe non-durable subscription
error (#21099)
736bd5e65ee is described below
commit 736bd5e65eee77113dac7ee7832275bb8d659933
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Sep 1 18:02:06 2023 +0800
[fix][broker] Fix unsubscribe non-durable subscription error (#21099)
---
.../broker/service/persistent/PersistentTopic.java | 11 +++++----
.../pulsar/broker/service/BrokerServiceTest.java | 26 ++++++++++++++++++++++
2 files changed, 31 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3e3363b8c5b..948c6710447 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1161,15 +1161,14 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
private void asyncDeleteCursorWithClearDelayedMessage(String
subscriptionName,
CompletableFuture<Void> unsubscribeFuture) {
- if (!isDelayedDeliveryEnabled()
- || !(brokerService.getDelayedDeliveryTrackerFactory()
instanceof BucketDelayedDeliveryTrackerFactory)) {
- asyncDeleteCursor(subscriptionName, unsubscribeFuture);
- return;
- }
-
PersistentSubscription persistentSubscription =
subscriptions.get(subscriptionName);
if (persistentSubscription == null) {
log.warn("[{}][{}] Can't find subscription, skip clear delayed
message", topic, subscriptionName);
+ unsubscribeFuture.complete(null);
+ return;
+ }
+ if (!isDelayedDeliveryEnabled()
+ || !(brokerService.getDelayedDeliveryTrackerFactory()
instanceof BucketDelayedDeliveryTrackerFactory)) {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
return;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 9012c6edb2f..04018d5fb9d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -91,6 +91,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
@@ -1623,4 +1624,29 @@ public class BrokerServiceTest extends BrokerTestBase {
assertEquals(admin.topics().getStats(topicName).getSubscriptions()
.get("sub-1").getUnackedMessages(), 0);
}
+
+ @Test
+ public void testUnsubscribeNonDurableSub() throws Exception {
+ final String ns = "prop/ns-test";
+ final String topic = ns + "/testUnsubscribeNonDurableSub";
+
+ admin.namespaces().createNamespace(ns, 2);
+ admin.topics().createPartitionedTopic(String.format("persistent://%s",
topic), 1);
+
+ pulsarClient.newProducer(Schema.STRING).topic(topic).create().close();
+ @Cleanup
+ Consumer<String> consumer = pulsarClient
+ .newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionMode(SubscriptionMode.NonDurable)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName("sub1")
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+ try {
+ consumer.unsubscribe();
+ } catch (Exception ex) {
+ fail("Unsubscribe failed");
+ }
+ }
}