This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 57a78531a4b9014eb58ecb5da5f4c384d73b6bc2 Author: Wenzhi Feng <52550727+thetumb...@users.noreply.github.com> AuthorDate: Mon Mar 17 18:17:24 2025 +0800 [clean][client] Clean code for the construction of retry/dead letter topic name (#24082) (cherry picked from commit a6088103170cebc64cc4e6330a8b30a8ecb93221) --- .../SimpleProducerConsumerDisallowAutoCreateTopicTest.java | 4 ++-- .../apache/pulsar/client/impl/TransactionEndToEndTest.java | 12 ++++-------- .../org/apache/pulsar/client/impl/ConsumerBuilderImpl.java | 8 ++++---- .../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 7 ++----- .../java/org/apache/pulsar/client/util/RetryMessageUtil.java | 8 ++++++++ 5 files changed, 20 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerDisallowAutoCreateTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerDisallowAutoCreateTopicTest.java index 728e556f022..a7927dd372c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerDisallowAutoCreateTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerDisallowAutoCreateTopicTest.java @@ -18,11 +18,11 @@ */ package org.apache.pulsar.client.api; -import static org.apache.pulsar.client.util.RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.util.RetryMessageUtil; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -54,7 +54,7 @@ public class SimpleProducerConsumerDisallowAutoCreateTopicTest extends ProducerC public void testClearErrorIfRetryTopicNotExists() throws Exception { final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); final String subName = "sub"; - final String retryTopicName = topicName + "-" + subName + RETRY_GROUP_TOPIC_SUFFIX; + final String retryTopicName = RetryMessageUtil.getRetryTopic(topicName, subName); admin.topics().createNonPartitionedTopic(topicName); Consumer consumer = null; try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index 4abcb09c0fe..201340d7315 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -1508,8 +1508,7 @@ public class TransactionEndToEndTest extends TransactionTestBase { @Cleanup Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer() - .topic(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX, - topic, subName)) + .topic(RetryMessageUtil.getDLQTopic(topic, subName)) .subscriptionType(SubscriptionType.Shared) .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()) .subscriptionName("test") @@ -1544,8 +1543,7 @@ public class TransactionEndToEndTest extends TransactionTestBase { consumer.close(); deadLetterConsumer.close(); producer.close(); - admin.topics().delete(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX, - topic, subName), true); + admin.topics().delete(RetryMessageUtil.getDLQTopic(topic, subName), true); admin.topics().delete(topic, true); } @@ -1572,8 +1570,7 @@ public class TransactionEndToEndTest extends TransactionTestBase { @Cleanup Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer() - .topic(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX, - topic, subName)) + .topic(RetryMessageUtil.getDLQTopic(topic, subName)) .subscriptionType(SubscriptionType.Shared) .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()) .subscriptionName("test") @@ -1619,8 +1616,7 @@ public class TransactionEndToEndTest extends TransactionTestBase { consumer.close(); deadLetterConsumer.close(); producer.close(); - admin.topics().delete(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX, - topic, subName), true); + admin.topics().delete(RetryMessageUtil.getDLQTopic(topic, subName), true); admin.topics().delete(topic, true); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 35f772028f1..48691e06de8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -165,10 +165,10 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> { CompletableFuture<Boolean> deadLetterTopicMetadata = checkDlqAlreadyExists(oldDeadLetterTopic); applyDLQConfig = CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata) .thenAccept(__ -> { - String retryLetterTopic = topicFirst + "-" + conf.getSubscriptionName() - + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX; - String deadLetterTopic = topicFirst + "-" + conf.getSubscriptionName() - + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX; + String retryLetterTopic = RetryMessageUtil.getRetryTopic(topicFirst.toString(), + conf.getSubscriptionName()); + String deadLetterTopic = RetryMessageUtil.getDLQTopic(topicFirst.toString(), + conf.getSubscriptionName()); if (retryLetterTopicMetadata.join()) { retryLetterTopic = oldRetryLetterTopic; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index bad05ae0c97..4443c68321b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -369,17 +369,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } else { this.deadLetterPolicy = DeadLetterPolicy.builder() .maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount()) - .deadLetterTopic(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX, - topic, subscription)) + .deadLetterTopic(RetryMessageUtil.getDLQTopic(topic, subscription)) .build(); } if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getRetryLetterTopic())) { this.deadLetterPolicy.setRetryLetterTopic(conf.getDeadLetterPolicy().getRetryLetterTopic()); } else { - this.deadLetterPolicy.setRetryLetterTopic(String.format( - "%s-%s" + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX, - topic, subscription)); + this.deadLetterPolicy.setRetryLetterTopic(RetryMessageUtil.getRetryTopic(topic, subscription)); } if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getInitialSubscriptionName())) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java index f73c2668779..d30cb2d1ad1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryMessageUtil.java @@ -30,4 +30,12 @@ public class RetryMessageUtil { public static final int MAX_RECONSUMETIMES = 16; public static final String RETRY_GROUP_TOPIC_SUFFIX = "-RETRY"; public static final String DLQ_GROUP_TOPIC_SUFFIX = "-DLQ"; + + public static String getRetryTopic(String topic, String subscription) { + return topic + "-" + subscription + RETRY_GROUP_TOPIC_SUFFIX; + } + + public static String getDLQTopic(String topic, String subscription) { + return topic + "-" + subscription + DLQ_GROUP_TOPIC_SUFFIX; + } } \ No newline at end of file