This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 2d0089e6172 [fix][client] Fix enableRetry for consumers using legacy
topic naming where cluster name is included (#23753)
2d0089e6172 is described below
commit 2d0089e61728c8fa19b009431f24ee9e58abb152
Author: crossoverJie <[email protected]>
AuthorDate: Fri Dec 20 20:19:16 2024 +0800
[fix][client] Fix enableRetry for consumers using legacy topic naming where
cluster name is included (#23753)
(cherry picked from commit 217ebfbeaab9a33e648912bfae8ed47e9199d41a)
---
.../pulsar/client/impl/TopicsConsumerImplTest.java | 18 ++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerBuilderImpl.java | 8 ++++----
2 files changed, 22 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 1d5ac759625..3c7cd16f144 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -138,6 +138,24 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
}
}
+ @Test(timeOut = testTimeout)
+ public void testRetryClusterTopic() throws Exception {
+ String key = "testRetryClusterTopic";
+ final String topicName = "persistent://prop/use/ns-abc1/topic-1-" +
key;
+ TenantInfoImpl tenantInfo = createDefaultTenantInfo();
+ final String namespace = "prop/ns-abc1";
+ admin.tenants().createTenant("prop", tenantInfo);
+ admin.namespaces().createNamespace(namespace, Set.of("test"));
+ Consumer consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName("my-sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .enableRetry(true)
+ .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+ .subscribe();
+ assertTrue(consumer instanceof MultiTopicsConsumerImpl);
+ }
+
@Test(timeOut = testTimeout)
public void testGetConsumersAndGetTopics() throws Exception {
String key = "TopicsConsumerGet";
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 351025d426a..35f772028f1 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -154,10 +154,10 @@ public class ConsumerBuilderImpl<T> implements
ConsumerBuilder<T> {
if (conf.isRetryEnable() && conf.getTopicNames().size() > 0) {
TopicName topicFirst =
TopicName.get(conf.getTopicNames().iterator().next());
//Issue 9327: do compatibility check in case of the default retry
and dead letter topic name changed
- String oldRetryLetterTopic = topicFirst.getNamespace() + "/" +
conf.getSubscriptionName()
- + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
- String oldDeadLetterTopic = topicFirst.getNamespace() + "/" +
conf.getSubscriptionName()
- + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+ String oldRetryLetterTopic =
TopicName.get(topicFirst.getDomain().value(), topicFirst.getNamespaceObject(),
+ conf.getSubscriptionName() +
RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX).toString();
+ String oldDeadLetterTopic =
TopicName.get(topicFirst.getDomain().value(), topicFirst.getNamespaceObject(),
+ conf.getSubscriptionName() +
RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX).toString();
DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
if (deadLetterPolicy == null ||
StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())
||
StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {