This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new d92fb14  Default retry and dlq topic name fixed as per the doc (#891)
d92fb14 is described below

commit d92fb1407d3d39c8a498dd7c7abdc0bbb3fc7e1a
Author: Nitin Goyal <[email protected]>
AuthorDate: Sun Dec 18 15:21:16 2022 +0530

    Default retry and dlq topic name fixed as per the doc (#891)
    
    * default retry and dlq name fixed based on doc
    
    default retry and dlq name fixed based on doc
    
    * backword compatibility fixed
    
    Signed-off-by: Nitin Goyal <[email protected]>
    
    * testcase fixed
    
    Signed-off-by: Nitin Goyal <[email protected]>
    
    * testcase fixed
    
    Signed-off-by: Nitin Goyal <[email protected]>
    
    * testcase fixed
    
    Signed-off-by: Nitin Goyal <[email protected]>
    
    * Lint fixed
    
    * Fix Retry topic when topic is partitioned
    
    * Fix Retry topic when topic is partitioned
    
    * RETRY and DLQ topic name bug fixed for partition topics
    
    * RETRY and DLQ topic name bug fixed for partition topics
    
    * Bug fix
    
    Signed-off-by: Nitin Goyal <[email protected]>
---
 pulsar/consumer_impl.go | 22 ++++++++++++++++++++--
 pulsar/consumer_test.go |  6 +++---
 2 files changed, 23 insertions(+), 5 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 707c13c..e5290a1 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -122,8 +122,26 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                        return nil, err
                }
 
-               retryTopic := tn.Domain + "://" + tn.Namespace + "/" + 
options.SubscriptionName + RetryTopicSuffix
-               dlqTopic := tn.Domain + "://" + tn.Namespace + "/" + 
options.SubscriptionName + DlqTopicSuffix
+               topicName := internal.TopicNameWithoutPartitionPart(tn)
+
+               retryTopic := topicName + "-" + options.SubscriptionName + 
RetryTopicSuffix
+               dlqTopic := topicName + "-" + options.SubscriptionName + 
DlqTopicSuffix
+
+               oldRetryTopic := tn.Domain + "://" + tn.Namespace + "/" + 
options.SubscriptionName + RetryTopicSuffix
+               oldDlqTopic := tn.Domain + "://" + tn.Namespace + "/" + 
options.SubscriptionName + DlqTopicSuffix
+
+               if r, err := 
client.lookupService.GetPartitionedTopicMetadata(oldRetryTopic); err == nil &&
+                       r != nil &&
+                       r.Partitions > 0 {
+                       retryTopic = oldRetryTopic
+               }
+
+               if r, err := 
client.lookupService.GetPartitionedTopicMetadata(oldDlqTopic); err == nil &&
+                       r != nil &&
+                       r.Partitions > 0 {
+                       dlqTopic = oldDlqTopic
+               }
+
                if options.DLQ == nil {
                        options.DLQ = &DLQPolicy{
                                MaxDeliveries:    MaxReconsumeTimes,
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 7a7f118..355e33b 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1498,7 +1498,7 @@ func TestRLQ(t *testing.T) {
 
        // 3. Create consumer on the DLQ topic to verify the routing
        dlqConsumer, err := client.Subscribe(ConsumerOptions{
-               Topic:                       "persistent://public/default/" + 
subName + "-DLQ",
+               Topic:                       "persistent://public/default/" + 
topic + "-" + subName + "-DLQ",
                SubscriptionName:            subName,
                SubscriptionInitialPosition: SubscriptionPositionEarliest,
        })
@@ -1603,7 +1603,7 @@ func TestRLQMultiTopics(t *testing.T) {
 
        // subscribe DLQ Topic
        dlqConsumer, err := client.Subscribe(ConsumerOptions{
-               Topic:                       subName + "-DLQ",
+               Topic:                       topics[0] + "-" + subName + "-DLQ",
                SubscriptionName:            subName,
                SubscriptionInitialPosition: SubscriptionPositionEarliest,
        })
@@ -1710,7 +1710,7 @@ func TestRLQSpecifiedPartitionTopic(t *testing.T) {
 
        // subscribe DLQ Topic
        dlqConsumer, err := client.Subscribe(ConsumerOptions{
-               Topic:                       subName + "-DLQ",
+               Topic:                       normalTopic + "-" + subName + 
"-DLQ",
                SubscriptionName:            subName,
                SubscriptionInitialPosition: SubscriptionPositionEarliest,
        })

Reply via email to