This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ee34f3  fix retry policy not effective with partitioned topic (#425)
2ee34f3 is described below

commit 2ee34f38746f78b9e997ce882c1db7feeadacb8d
Author: wuYin <[email protected]>
AuthorDate: Mon Jan 4 19:10:22 2021 +0800

    fix retry policy not effective with partitioned topic (#425)
    
    ### Issue
    Retry policy not effective with partitioned topic.
    
    - reproduction
     topic-01 have 2 partitions:
        ```go
        client, _ := pulsar.NewClient(pulsar.ClientOptions{URL: 
"pulsar://localhost:6650"})
        consumer, _ := client.Subscribe(pulsar.ConsumerOptions{
                Topic:            "topic-01",
                SubscriptionName: "my-sub",
                RetryEnable:      true,
                DLQ:              &pulsar.DLQPolicy{MaxDeliveries: 2},
        })
        msg, _ := consumer.Receive(context.Background())
        consumer.ReconsumeLater(msg, 5*time.Second)
        ```
    - logs
    
        ```
        WARN[0000] consumer of topic 
[persistent://public/default/topic-01-partition-0] not exist unexpectedly  
topic=" [persistent://public/default/topic-01 
persistent://platform/psr/my-sub-RETRY]"
        ```
    
    ### Cause
    For MultiTopicConsumer `consumers` map filed:
    - key: FQDN topics, without partition number suffix.
    - value: consumer instance.
    
    `ReconsumeLater` using msg's partition number suffix topic as key, to find 
`consumer` in `consumers`, but allways not found, lead to Retry policy not 
effective.
    
    ### Modifications
    - Trim partition number in partitioned topic before using it
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
---
 pulsar/consumer_multitopic.go | 14 +++++++++++++-
 pulsar/consumer_test.go       |  3 +++
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index e5ede99..479a12a 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -24,6 +24,7 @@ import (
        "sync"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/internal"
        pkgerrors "github.com/pkg/errors"
 
        "github.com/apache/pulsar-client-go/pulsar/log"
@@ -136,7 +137,18 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) {
 }
 
 func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) {
-       consumer, ok := c.consumers[msg.Topic()]
+       names, err := validateTopicNames(msg.Topic())
+       if err != nil {
+               c.log.Errorf("validate msg topic %q failed: %v", msg.Topic(), 
err)
+               return
+       }
+       if len(names) != 1 {
+               c.log.Errorf("invalid msg topic %q names: %+v ", msg.Topic(), 
names)
+               return
+       }
+
+       fqdnTopic := internal.TopicNameWithoutPartitionPart(names[0])
+       consumer, ok := c.consumers[fqdnTopic]
        if !ok {
                c.log.Warnf("consumer of topic %s not exist unexpectedly", 
msg.Topic())
                return
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 5b9b6e0..82a1f97 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1149,6 +1149,9 @@ func TestDLQMultiTopics(t *testing.T) {
 
 func TestRLQ(t *testing.T) {
        topic := newTopicName()
+       testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + 
topic + "/partitions"
+       makeHTTPCall(t, http.MethodPut, testURL, "3")
+
        subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
        maxRedeliveries := 2
        N := 100

Reply via email to