This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 8465c55  Fix reconsume broken while using non-FQDN topics (#386)
8465c55 is described below

commit 8465c55036eb715663aeca95277106255fee2842
Author: wuYin <[email protected]>
AuthorDate: Thu Nov 5 16:47:20 2020 +0800

    Fix reconsume broken while using non-FQDN topics (#386)
    
    ### Issue
    Retry policy not effective with non-FQDN topic.
    
    - reproduction
        ```go
        client, _ := pulsar.NewClient(pulsar.ClientOptions{URL: 
"pulsar://localhost:6650"})
        consumer, _ := client.Subscribe(pulsar.ConsumerOptions{
                Topic:            "topic-01",
                SubscriptionName: "my-sub",
                RetryEnable:      true,
                DLQ:              &pulsar.DLQPolicy{MaxDeliveries: 2},
        })
        msg, _ := consumer.Receive(context.Background())
        consumer.ReconsumeLater(msg, 5*time.Second)
        ```
    - logs
    
        ```
        RN[0000] consumer of topic [persistent://public/default/topic-01] not 
exist unexpectedly  topic="[topic-01 persistent://public/default/my-sub-RETRY]"
        ```
    
    ### Cause
    For MultiTopicConsumer `consumers` map filed:
    - key: user provided topic, maybe non-FQDN.
    - value: consumer instance.
    
    `ReconsumeLater` using msg's FQDN topic as key to find `consumer` in 
`consumers`,
     if mismatch with non-FQDN topic, this invoke will be ignored, lead to 
Retry policy not effective.
    
    ### Modifications
    - Normalize user provided topics as FQDN topics before initializing 
consumers.
    - Add non-FQDN topic consumption case in Retry policy tests.
    
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
---
 pulsar/consumer_impl.go | 11 ++++++++---
 pulsar/consumer_test.go |  6 +++---
 pulsar/helper.go        | 15 ++++++++-------
 3 files changed, 19 insertions(+), 13 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index dceb1e2..8ae9161 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -160,6 +160,8 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                return nil, err
        }
 
+       // normalize as FQDN topics
+       var tns []*internal.TopicName
        // single topic consumer
        if options.Topic != "" || len(options.Topics) == 1 {
                topic := options.Topic
@@ -167,17 +169,20 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                        topic = options.Topics[0]
                }
 
-               if err := validateTopicNames(topic); err != nil {
+               if tns, err = validateTopicNames(topic); err != nil {
                        return nil, err
                }
-
+               topic = tns[0].Name
                return topicSubscribe(client, options, topic, messageCh, dlq, 
rlq)
        }
 
        if len(options.Topics) > 1 {
-               if err := validateTopicNames(options.Topics...); err != nil {
+               if tns, err = validateTopicNames(options.Topics...); err != nil 
{
                        return nil, err
                }
+               for i := range options.Topics {
+                       options.Topics[i] = tns[i].Name
+               }
 
                return newMultiTopicConsumer(client, options, options.Topics, 
messageCh, dlq, rlq)
        }
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 97d2de2..f31bb69 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1147,7 +1147,7 @@ func TestDLQMultiTopics(t *testing.T) {
 }
 
 func TestRLQ(t *testing.T) {
-       topic := "persistent://public/default/" + newTopicName()
+       topic := newTopicName()
        subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
        maxRedeliveries := 2
        N := 100
@@ -1243,7 +1243,7 @@ func TestRLQ(t *testing.T) {
 func TestRLQMultiTopics(t *testing.T) {
        now := time.Now().Unix()
        topic01 := fmt.Sprintf("persistent://public/default/topic-%d-1", now)
-       topic02 := fmt.Sprintf("persistent://public/default/topic-%d-2", now)
+       topic02 := fmt.Sprintf("topic-%d-2", now)
        topics := []string{topic01, topic02}
 
        subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
@@ -1270,7 +1270,7 @@ func TestRLQMultiTopics(t *testing.T) {
 
        // subscribe DLQ Topic
        dlqConsumer, err := client.Subscribe(ConsumerOptions{
-               Topic:                       "persistent://public/default/" + 
subName + "-DLQ",
+               Topic:                       subName + "-DLQ",
                SubscriptionName:            subName,
                SubscriptionInitialPosition: SubscriptionPositionEarliest,
        })
diff --git a/pulsar/helper.go b/pulsar/helper.go
index 0f8cf20..fb42cb5 100644
--- a/pulsar/helper.go
+++ b/pulsar/helper.go
@@ -53,15 +53,16 @@ func (e *unexpectedErrMsg) Error() string {
        return msg
 }
 
-func validateTopicNames(topics ...string) error {
-       var errs error
-       for _, t := range topics {
-               if _, err := internal.ParseTopicName(t); err != nil {
-                       errs = pkgerrors.Wrapf(err, "invalid topic name: %s", t)
+func validateTopicNames(topics ...string) ([]*internal.TopicName, error) {
+       tns := make([]*internal.TopicName, len(topics))
+       for i, t := range topics {
+               tn, err := internal.ParseTopicName(t)
+               if err != nil {
+                       return nil, pkgerrors.Wrapf(err, "invalid topic name: 
%s", t)
                }
+               tns[i] = tn
        }
-
-       return errs
+       return tns, nil
 }
 
 func toKeyValues(metadata map[string]string) []*pb.KeyValue {

Reply via email to