This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a59ab7768e [fix][client] fix retry topic with exclusive mode. (#23859)
5a59ab7768e is described below

commit 5a59ab7768e11db6ed92dada78b398feca9e24fc
Author: Wenzhi Feng <[email protected]>
AuthorDate: Tue Feb 18 17:59:52 2025 +0800

    [fix][client] fix retry topic with exclusive mode. (#23859)
---
 .../apache/pulsar/client/api/RetryTopicTest.java   | 40 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  8 +++++
 2 files changed, 48 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index d0e72deb87f..2b897760b6f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -136,6 +136,46 @@ public class RetryTopicTest extends ProducerConsumerBase {
         checkConsumer.close();
     }
 
+    /**
+     * Retry topic feature relies on the delay queue feature when consumer 
produce a delayed message
+     * to the retry topic. The delay queue feature is only supported in shared 
and key-shared subscription type.
+     * As a result, the subscription type of the retry topic should be shared 
or key-shared.
+     * @throws Exception
+     */
+    @Test
+    public void testRetryTopicWithExclusiveMode() throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/retry-topic-exclusive";
+        final int maxRedeliveryCount = 2;
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Exclusive)
+                .enableRetry(true)
+                
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                .receiverQueueSize(100)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+
+        producer.send("Hello Pulsar".getBytes());
+        producer.close();
+
+        // receive message and set delay to 5 seconds
+        Message<byte[]> message = consumer.receive();
+        long timestamp = System.currentTimeMillis();
+        consumer.reconsumeLater(message, 4, TimeUnit.SECONDS);
+
+        // receive message and check the delay is at least 4 seconds
+        consumer.receive();
+        long delay = System.currentTimeMillis() - timestamp;
+        assertTrue(delay >= 2000);
+        consumer.close();
+    }
+
     @Data
     public static class Foo {
         @Nullable
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 1ad8c6d28f1..27e2216e589 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageIdAdv;
@@ -792,6 +793,13 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     }
 
     protected SubType getSubType() {
+        // For retry topic, we always use Shared subscription
+        // Because we will produce delayed messages to retry topic.
+        DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
+        if (deadLetterPolicy != null && 
topic.equals(deadLetterPolicy.getRetryLetterTopic())) {
+            return SubType.Shared;
+        }
+
         SubscriptionType type = conf.getSubscriptionType();
         switch (type) {
         case Exclusive:

Reply via email to