This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5a59ab7768e [fix][client] fix retry topic with exclusive mode. (#23859)
5a59ab7768e is described below
commit 5a59ab7768e11db6ed92dada78b398feca9e24fc
Author: Wenzhi Feng <[email protected]>
AuthorDate: Tue Feb 18 17:59:52 2025 +0800
[fix][client] fix retry topic with exclusive mode. (#23859)
---
.../apache/pulsar/client/api/RetryTopicTest.java | 40 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 8 +++++
2 files changed, 48 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index d0e72deb87f..2b897760b6f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -136,6 +136,46 @@ public class RetryTopicTest extends ProducerConsumerBase {
checkConsumer.close();
}
+ /**
+ * Retry topic feature relies on the delay queue feature when consumer
produce a delayed message
+ * to the retry topic. The delay queue feature is only supported in shared
and key-shared subscription type.
+ * As a result, the subscription type of the retry topic should be shared
or key-shared.
+ * @throws Exception
+ */
+ @Test
+ public void testRetryTopicWithExclusiveMode() throws Exception {
+ final String topic =
"persistent://my-property/my-ns/retry-topic-exclusive";
+ final int maxRedeliveryCount = 2;
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+ .topic(topic)
+ .subscriptionName("my-subscription")
+ .subscriptionType(SubscriptionType.Exclusive)
+ .enableRetry(true)
+
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+ .receiverQueueSize(100)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(topic)
+ .create();
+
+ producer.send("Hello Pulsar".getBytes());
+ producer.close();
+
+ // receive message and set delay to 5 seconds
+ Message<byte[]> message = consumer.receive();
+ long timestamp = System.currentTimeMillis();
+ consumer.reconsumeLater(message, 4, TimeUnit.SECONDS);
+
+ // receive message and check the delay is at least 4 seconds
+ consumer.receive();
+ long delay = System.currentTimeMillis() - timestamp;
+ assertTrue(delay >= 2000);
+ consumer.close();
+ }
+
@Data
public static class Foo {
@Nullable
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 1ad8c6d28f1..27e2216e589 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
@@ -792,6 +793,13 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
}
protected SubType getSubType() {
+ // For retry topic, we always use Shared subscription
+ // Because we will produce delayed messages to retry topic.
+ DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
+ if (deadLetterPolicy != null &&
topic.equals(deadLetterPolicy.getRetryLetterTopic())) {
+ return SubType.Shared;
+ }
+
SubscriptionType type = conf.getSubscriptionType();
switch (type) {
case Exclusive: