This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f961e7572e83347b7e1201237e6c06fad45b13e4 Author: crossoverJie <[email protected]> AuthorDate: Fri Dec 20 20:19:16 2024 +0800 [fix][client] Fix enableRetry for consumers using legacy topic naming where cluster name is included (#23753) (cherry picked from commit 217ebfbeaab9a33e648912bfae8ed47e9199d41a) --- .../pulsar/client/impl/TopicsConsumerImplTest.java | 18 ++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerBuilderImpl.java | 8 ++++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index 1d5ac759625..3c7cd16f144 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -138,6 +138,24 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { } } + @Test(timeOut = testTimeout) + public void testRetryClusterTopic() throws Exception { + String key = "testRetryClusterTopic"; + final String topicName = "persistent://prop/use/ns-abc1/topic-1-" + key; + TenantInfoImpl tenantInfo = createDefaultTenantInfo(); + final String namespace = "prop/ns-abc1"; + admin.tenants().createTenant("prop", tenantInfo); + admin.namespaces().createNamespace(namespace, Set.of("test")); + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName("my-sub") + .subscriptionType(SubscriptionType.Shared) + .enableRetry(true) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .subscribe(); + assertTrue(consumer instanceof MultiTopicsConsumerImpl); + } + @Test(timeOut = testTimeout) public void testGetConsumersAndGetTopics() throws Exception { String key = "TopicsConsumerGet"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 351025d426a..35f772028f1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -154,10 +154,10 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> { if (conf.isRetryEnable() && conf.getTopicNames().size() > 0) { TopicName topicFirst = TopicName.get(conf.getTopicNames().iterator().next()); //Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed - String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() - + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX; - String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() - + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX; + String oldRetryLetterTopic = TopicName.get(topicFirst.getDomain().value(), topicFirst.getNamespaceObject(), + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX).toString(); + String oldDeadLetterTopic = TopicName.get(topicFirst.getDomain().value(), topicFirst.getNamespaceObject(), + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX).toString(); DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy(); if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic()) || StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
