This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 2d0089e6172 [fix][client] Fix enableRetry for consumers using legacy 
topic naming where cluster name is included (#23753)
2d0089e6172 is described below

commit 2d0089e61728c8fa19b009431f24ee9e58abb152
Author: crossoverJie <[email protected]>
AuthorDate: Fri Dec 20 20:19:16 2024 +0800

    [fix][client] Fix enableRetry for consumers using legacy topic naming where 
cluster name is included (#23753)
    
    (cherry picked from commit 217ebfbeaab9a33e648912bfae8ed47e9199d41a)
---
 .../pulsar/client/impl/TopicsConsumerImplTest.java     | 18 ++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBuilderImpl.java |  8 ++++----
 2 files changed, 22 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 1d5ac759625..3c7cd16f144 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -138,6 +138,24 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         }
     }
 
+    @Test(timeOut = testTimeout)
+    public void testRetryClusterTopic() throws Exception {
+        String key = "testRetryClusterTopic";
+        final String topicName = "persistent://prop/use/ns-abc1/topic-1-" + 
key;
+        TenantInfoImpl tenantInfo = createDefaultTenantInfo();
+        final String namespace = "prop/ns-abc1";
+        admin.tenants().createTenant("prop", tenantInfo);
+        admin.namespaces().createNamespace(namespace, Set.of("test"));
+        Consumer consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName("my-sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .enableRetry(true)
+                .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+                .subscribe();
+        assertTrue(consumer instanceof MultiTopicsConsumerImpl);
+    }
+
     @Test(timeOut = testTimeout)
     public void testGetConsumersAndGetTopics() throws Exception {
         String key = "TopicsConsumerGet";
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 351025d426a..35f772028f1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -154,10 +154,10 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
         if (conf.isRetryEnable() && conf.getTopicNames().size() > 0) {
             TopicName topicFirst = 
TopicName.get(conf.getTopicNames().iterator().next());
             //Issue 9327: do compatibility check in case of the default retry 
and dead letter topic name changed
-            String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + 
conf.getSubscriptionName()
-                    + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
-            String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + 
conf.getSubscriptionName()
-                    + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+            String oldRetryLetterTopic = 
TopicName.get(topicFirst.getDomain().value(), topicFirst.getNamespaceObject(),
+                    conf.getSubscriptionName() + 
RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX).toString();
+            String oldDeadLetterTopic = 
TopicName.get(topicFirst.getDomain().value(), topicFirst.getNamespaceObject(),
+                    conf.getSubscriptionName() + 
RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX).toString();
             DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
             if (deadLetterPolicy == null || 
StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())
                     || 
StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {

Reply via email to