This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c523ba  Support retry letter topic  (#359)
3c523ba is described below

commit 3c523ba8288929ec4299b5a362fe8fc9088858e2
Author: wuYin <[email protected]>
AuthorDate: Wed Sep 9 22:00:43 2020 +0800

    Support retry letter topic  (#359)
    
    ### Motivation
    
    Follow [pulsar#6449](https://github.com/apache/pulsar/pull/6449) to support 
retry letter topic in go client
    
    ### Modifications
    
    - Add `retryRouter` for sending reconsume messages to retry letter topic
    - Add `ReconsumeLater(msg Message, delay time.Duration)` to Consumer 
interface
    - Add configureable retry letter topic name in `DLQPolicy`
        ```go
        type DLQPolicy struct {
                // ...
                // Name of the topic where the retry messages will be sent.
                RetryLetterTopic string
        }
        ```
        enable it explicitly while creating consumer, default unenable
    
         ```go
        type ConsumerOptions struct {
            // ...
                // Auto retry send messages to default filled DLQPolicy topics
                RetryEnable bool
        }
        ```
    - Add 2 `TestRLQ*`  test cases
---
 pulsar/consumer.go                        |  12 +-
 pulsar/consumer_impl.go                   | 104 ++++++++++++++-
 pulsar/consumer_multitopic.go             |  16 ++-
 pulsar/consumer_regex.go                  |  21 ++-
 pulsar/consumer_regex_test.go             |   6 +-
 pulsar/consumer_test.go                   | 207 +++++++++++++++++++++++++++++-
 pulsar/dlq_router.go                      |   6 +-
 pulsar/impl_message.go                    |   5 +
 pulsar/internal/connection.go             |  32 ++++-
 pulsar/internal/topic_name.go             |   4 +-
 pulsar/producer_impl.go                   |   4 +-
 pulsar/{dlq_router.go => retry_router.go} |  91 ++++++-------
 12 files changed, 423 insertions(+), 85 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index c1fe454..a99c030 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -66,7 +66,10 @@ type DLQPolicy struct {
        MaxDeliveries uint32
 
        // Name of the topic where the failing messages will be sent.
-       Topic string
+       DeadLetterTopic string
+
+       // Name of the topic where the retry messages will be sent.
+       RetryLetterTopic string
 }
 
 // ConsumerOptions is used to configure and create instances of Consumer
@@ -107,6 +110,10 @@ type ConsumerOptions struct {
        // By default is nil and there's no DLQ
        DLQ *DLQPolicy
 
+       // Auto retry send messages to default filled DLQPolicy topics
+       // Default is false
+       RetryEnable bool
+
        // Sets a `MessageChannel` for the consumer
        // When a message is received, it will be pushed to the channel for 
consumption
        MessageChannel chan ConsumerMessage
@@ -163,6 +170,9 @@ type Consumer interface {
        // AckID the consumption of a single message, identified by its 
MessageID
        AckID(MessageID)
 
+       // ReconsumeLater mark a message for redelivery after custom delay
+       ReconsumeLater(msg Message, delay time.Duration)
+
        // Acknowledge the failure to process a single message.
        //
        // When a message is "negatively acked" it will be marked for 
redelivery after
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index b5de1f9..7e266a2 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -22,6 +22,7 @@ import (
        "errors"
        "fmt"
        "math/rand"
+       "strconv"
        "sync"
        "time"
 
@@ -73,6 +74,7 @@ type consumer struct {
        messageCh chan ConsumerMessage
 
        dlq       *dlqRouter
+       rlq       *retryRouter
        closeOnce sync.Once
        closeCh   chan struct{}
        errorCh   chan error
@@ -108,10 +110,50 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                messageCh = make(chan ConsumerMessage, 10)
        }
 
+       if options.RetryEnable {
+               usingTopic := ""
+               if options.Topic != "" {
+                       usingTopic = options.Topic
+               } else if len(options.Topics) > 0 {
+                       usingTopic = options.Topics[0]
+               }
+               tn, err := internal.ParseTopicName(usingTopic)
+               if err != nil {
+                       return nil, err
+               }
+
+               retryTopic := tn.Domain + "://" + tn.Namespace + "/" + 
options.SubscriptionName + RetryTopicSuffix
+               dlqTopic := tn.Domain + "://" + tn.Namespace + "/" + 
options.SubscriptionName + DlqTopicSuffix
+               if options.DLQ == nil {
+                       options.DLQ = &DLQPolicy{
+                               MaxDeliveries:    MaxReconsumeTimes,
+                               DeadLetterTopic:  dlqTopic,
+                               RetryLetterTopic: retryTopic,
+                       }
+               } else {
+                       if options.DLQ.DeadLetterTopic == "" {
+                               options.DLQ.DeadLetterTopic = dlqTopic
+                       }
+                       if options.DLQ.RetryLetterTopic == "" {
+                               options.DLQ.RetryLetterTopic = retryTopic
+                       }
+               }
+               if options.Topic != "" && len(options.Topics) == 0 {
+                       options.Topics = []string{options.Topic, 
options.DLQ.RetryLetterTopic}
+                       options.Topic = ""
+               } else if options.Topic == "" && len(options.Topics) > 0 {
+                       options.Topics = append(options.Topics, 
options.DLQ.RetryLetterTopic)
+               }
+       }
+
        dlq, err := newDlqRouter(client, options.DLQ)
        if err != nil {
                return nil, err
        }
+       rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable)
+       if err != nil {
+               return nil, err
+       }
 
        // single topic consumer
        if options.Topic != "" || len(options.Topics) == 1 {
@@ -124,7 +166,7 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                        return nil, err
                }
 
-               return topicSubscribe(client, options, topic, messageCh, dlq)
+               return topicSubscribe(client, options, topic, messageCh, dlq, 
rlq)
        }
 
        if len(options.Topics) > 1 {
@@ -132,7 +174,7 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                        return nil, err
                }
 
-               return newMultiTopicConsumer(client, options, options.Topics, 
messageCh, dlq)
+               return newMultiTopicConsumer(client, options, options.Topics, 
messageCh, dlq, rlq)
        }
 
        if options.TopicsPattern != "" {
@@ -145,14 +187,14 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                if err != nil {
                        return nil, err
                }
-               return newRegexConsumer(client, options, tn, pattern, 
messageCh, dlq)
+               return newRegexConsumer(client, options, tn, pattern, 
messageCh, dlq, rlq)
        }
 
        return nil, newError(ResultInvalidTopicName, "topic name is required 
for consumer")
 }
 
 func newInternalConsumer(client *client, options ConsumerOptions, topic string,
-       messageCh chan ConsumerMessage, dlq *dlqRouter, 
disableForceTopicCreation bool) (*consumer, error) {
+       messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, 
disableForceTopicCreation bool) (*consumer, error) {
 
        consumer := &consumer{
                topic:                     topic,
@@ -163,6 +205,7 @@ func newInternalConsumer(client *client, options 
ConsumerOptions, topic string,
                closeCh:                   make(chan struct{}),
                errorCh:                   make(chan error),
                dlq:                       dlq,
+               rlq:                       rlq,
                log:                       log.WithField("topic", topic),
                consumerName:              options.Name,
        }
@@ -306,8 +349,8 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
 }
 
 func topicSubscribe(client *client, options ConsumerOptions, topic string,
-       messageCh chan ConsumerMessage, dlqRouter *dlqRouter) (Consumer, error) 
{
-       c, err := newInternalConsumer(client, options, topic, messageCh, 
dlqRouter, false)
+       messageCh chan ConsumerMessage, dlqRouter *dlqRouter, retryRouter 
*retryRouter) (Consumer, error) {
+       c, err := newInternalConsumer(client, options, topic, messageCh, 
dlqRouter, retryRouter, false)
        if err == nil {
                consumersOpened.Inc()
        }
@@ -375,6 +418,54 @@ func (c *consumer) AckID(msgID MessageID) {
        c.consumers[mid.partitionIdx].AckID(mid)
 }
 
+// ReconsumeLater mark a message for redelivery after custom delay
+func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) {
+       if delay < 0 {
+               delay = 0
+       }
+       msgID, ok := c.messageID(msg.ID())
+       if !ok {
+               return
+       }
+       props := make(map[string]string)
+       for k, v := range msg.Properties() {
+               props[k] = v
+       }
+
+       reconsumeTimes := 1
+       if s, ok := props[SysPropertyReconsumeTimes]; ok {
+               reconsumeTimes, _ = strconv.Atoi(s)
+               reconsumeTimes++
+       } else {
+               props[SysPropertyRealTopic] = msg.Topic()
+               props[SysPropertyOriginMessageID] = msgID.messageID.String()
+       }
+       props[SysPropertyReconsumeTimes] = strconv.Itoa(reconsumeTimes)
+       props[SysPropertyDelayTime] = fmt.Sprintf("%d", int64(delay)/1e6)
+
+       consumerMsg := ConsumerMessage{
+               Consumer: c,
+               Message: &message{
+                       payLoad:    msg.Payload(),
+                       properties: props,
+                       msgID:      msgID,
+               },
+       }
+       if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries {
+               c.dlq.Chan() <- consumerMsg
+       } else {
+               c.rlq.Chan() <- RetryMessage{
+                       consumerMsg: consumerMsg,
+                       producerMsg: ProducerMessage{
+                               Payload:      msg.Payload(),
+                               Key:          msg.Key(),
+                               Properties:   props,
+                               DeliverAfter: delay,
+                       },
+               }
+       }
+}
+
 func (c *consumer) Nack(msg Message) {
        c.NackID(msg.ID())
 }
@@ -411,6 +502,7 @@ func (c *consumer) Close() {
                c.ticker.Stop()
                c.client.handlers.Del(c)
                c.dlq.close()
+               c.rlq.close()
                consumersClosed.Inc()
                consumersPartitions.Sub(float64(len(c.consumers)))
        })
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 8d34203..0e823ca 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -38,6 +38,7 @@ type multiTopicConsumer struct {
        consumers map[string]Consumer
 
        dlq       *dlqRouter
+       rlq       *retryRouter
        closeOnce sync.Once
        closeCh   chan struct{}
 
@@ -45,19 +46,20 @@ type multiTopicConsumer struct {
 }
 
 func newMultiTopicConsumer(client *client, options ConsumerOptions, topics 
[]string,
-       messageCh chan ConsumerMessage, dlq *dlqRouter) (Consumer, error) {
+       messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) 
(Consumer, error) {
        mtc := &multiTopicConsumer{
                options:      options,
                messageCh:    messageCh,
                consumers:    make(map[string]Consumer, len(topics)),
                closeCh:      make(chan struct{}),
                dlq:          dlq,
+               rlq:          rlq,
                log:          &log.Entry{},
                consumerName: options.Name,
        }
 
        var errs error
-       for ce := range subscriber(client, topics, options, messageCh, dlq) {
+       for ce := range subscriber(client, topics, options, messageCh, dlq, 
rlq) {
                if ce.err != nil {
                        errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to 
topic=%s", ce.topic)
                } else {
@@ -134,6 +136,15 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) {
        mid.Ack()
 }
 
+func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) {
+       consumer, ok := c.consumers[msg.Topic()]
+       if !ok {
+               c.log.Warnf("consumer of topic %s not exist unexpectedly", 
msg.Topic())
+               return
+       }
+       consumer.ReconsumeLater(msg, delay)
+}
+
 func (c *multiTopicConsumer) Nack(msg Message) {
        c.NackID(msg.ID())
 }
@@ -166,6 +177,7 @@ func (c *multiTopicConsumer) Close() {
                wg.Wait()
                close(c.closeCh)
                c.dlq.close()
+               c.rlq.close()
                consumersClosed.Inc()
        })
 }
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index e0fdbcb..13ef600 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -40,6 +40,7 @@ const (
 type regexConsumer struct {
        client *client
        dlq    *dlqRouter
+       rlq    *retryRouter
 
        options ConsumerOptions
 
@@ -64,10 +65,11 @@ type regexConsumer struct {
 }
 
 func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, 
pattern *regexp.Regexp,
-       msgCh chan ConsumerMessage, dlq *dlqRouter) (Consumer, error) {
+       msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) 
(Consumer, error) {
        rc := &regexConsumer{
                client:    c,
                dlq:       dlq,
+               rlq:       rlq,
                options:   opts,
                messageCh: msgCh,
 
@@ -90,7 +92,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn 
*internal.TopicName, p
        }
 
        var errs error
-       for ce := range subscriber(c, topics, opts, msgCh, dlq) {
+       for ce := range subscriber(c, topics, opts, msgCh, dlq, rlq) {
                if ce.err != nil {
                        errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to 
topic=%s", ce.topic)
                } else {
@@ -163,6 +165,10 @@ func (c *regexConsumer) Ack(msg Message) {
        c.AckID(msg.ID())
 }
 
+func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
+       c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
+}
+
 // Ack the consumption of a single message, identified by its MessageID
 func (c *regexConsumer) AckID(msgID MessageID) {
        mid, ok := toTrackingMessageID(msgID)
@@ -215,6 +221,7 @@ func (c *regexConsumer) Close() {
                }
                wg.Wait()
                c.dlq.close()
+               c.rlq.close()
                consumersClosed.Inc()
        })
 }
@@ -253,7 +260,7 @@ func (c *regexConsumer) monitor() {
                        }
                case topics := <-c.subscribeCh:
                        if len(topics) > 0 && !c.closed() {
-                               c.subscribe(topics, c.dlq)
+                               c.subscribe(topics, c.dlq, c.rlq)
                        }
                case topics := <-c.unsubscribeCh:
                        if len(topics) > 0 && !c.closed() {
@@ -298,12 +305,12 @@ func (c *regexConsumer) knownTopics() []string {
        return topics
 }
 
-func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter) {
+func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq 
*retryRouter) {
        if log.GetLevel() == log.DebugLevel {
                c.log.WithField("topics", topics).Debug("subscribe")
        }
        consumers := make(map[string]Consumer, len(topics))
-       for ce := range subscriber(c.client, topics, c.options, c.messageCh, 
dlq) {
+       for ce := range subscriber(c.client, topics, c.options, c.messageCh, 
dlq, rlq) {
                if ce.err != nil {
                        c.log.Warnf("Failed to subscribe to topic=%s", ce.topic)
                } else {
@@ -359,7 +366,7 @@ type consumerError struct {
 }
 
 func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan 
ConsumerMessage,
-       dlq *dlqRouter) <-chan consumerError {
+       dlq *dlqRouter, rlq *retryRouter) <-chan consumerError {
        consumerErrorCh := make(chan consumerError, len(topics))
        var wg sync.WaitGroup
        wg.Add(len(topics))
@@ -371,7 +378,7 @@ func subscriber(c *client, topics []string, opts 
ConsumerOptions, ch chan Consum
        for _, t := range topics {
                go func(topic string) {
                        defer wg.Done()
-                       c, err := newInternalConsumer(c, opts, topic, ch, dlq, 
true)
+                       c, err := newInternalConsumer(c, opts, topic, ch, dlq, 
rlq, true)
                        consumerErrorCh <- consumerError{
                                err:      err,
                                topic:    topic,
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index c45c40a..e4acf5f 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -154,7 +154,8 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c 
Client, namespace string
        }
 
        dlq, _ := newDlqRouter(c.(*client), nil)
-       consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, 
make(chan ConsumerMessage, 1), dlq)
+       rlq, _ := newRetryRouter(c.(*client), nil, false)
+       consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, 
make(chan ConsumerMessage, 1), dlq, rlq)
        if err != nil {
                t.Fatal(err)
        }
@@ -202,7 +203,8 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c 
Client, namespace string
        }
 
        dlq, _ := newDlqRouter(c.(*client), nil)
-       consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, 
make(chan ConsumerMessage, 1), dlq)
+       rlq, _ := newRetryRouter(c.(*client), nil, false)
+       consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, 
make(chan ConsumerMessage, 1), dlq, rlq)
        if err != nil {
                t.Fatal(err)
        }
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index a3a22b6..97d2de2 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -973,8 +973,8 @@ func TestDLQ(t *testing.T) {
                NackRedeliveryDelay: 1 * time.Second,
                Type:                Shared,
                DLQ: &DLQPolicy{
-                       MaxDeliveries: 3,
-                       Topic:         dlqTopic,
+                       MaxDeliveries:   3,
+                       DeadLetterTopic: dlqTopic,
                },
        })
        assert.Nil(t, err)
@@ -1076,8 +1076,8 @@ func TestDLQMultiTopics(t *testing.T) {
                NackRedeliveryDelay: 1 * time.Second,
                Type:                Shared,
                DLQ: &DLQPolicy{
-                       MaxDeliveries: 3,
-                       Topic:         dlqTopic,
+                       MaxDeliveries:   3,
+                       DeadLetterTopic: dlqTopic,
                },
        })
        assert.Nil(t, err)
@@ -1146,6 +1146,205 @@ func TestDLQMultiTopics(t *testing.T) {
        assert.Nil(t, msg)
 }
 
+func TestRLQ(t *testing.T) {
+       topic := "persistent://public/default/" + newTopicName()
+       subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
+       maxRedeliveries := 2
+       N := 100
+       ctx := context.Background()
+
+       client, err := NewClient(ClientOptions{URL: lookupURL})
+       assert.Nil(t, err)
+       defer client.Close()
+
+       // 1. Pre-produce N messages
+       producer, err := client.CreateProducer(ProducerOptions{Topic: topic})
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       for i := 0; i < N; i++ {
+               _, err = producer.Send(ctx, &ProducerMessage{Payload: 
[]byte(fmt.Sprintf("MESSAGE_%d", i))})
+               assert.Nil(t, err)
+       }
+
+       // 2. Create consumer on the Retry Topic to reconsume N messages 
(maxRedeliveries+1) times
+       rlqConsumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                       topic,
+               SubscriptionName:            subName,
+               Type:                        Shared,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+               DLQ: &DLQPolicy{
+                       MaxDeliveries: uint32(maxRedeliveries),
+               },
+               RetryEnable:         true,
+               NackRedeliveryDelay: 1 * time.Second,
+       })
+       assert.Nil(t, err)
+       defer rlqConsumer.Close()
+
+       rlqReceived := 0
+       for rlqReceived < N*(maxRedeliveries+1) {
+               msg, err := rlqConsumer.Receive(ctx)
+               assert.Nil(t, err)
+               rlqConsumer.ReconsumeLater(msg, 1*time.Second)
+               rlqReceived++
+       }
+       fmt.Println("retry consumed:", rlqReceived) // 300
+
+       // No more messages on the Retry Topic
+       rlqCtx, rlqCancel := context.WithTimeout(context.Background(), 
500*time.Millisecond)
+       defer rlqCancel()
+       msg, err := rlqConsumer.Receive(rlqCtx)
+       assert.Error(t, err)
+       assert.Nil(t, msg)
+
+       // 3. Create consumer on the DLQ topic to verify the routing
+       dlqConsumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                       "persistent://public/default/" + 
subName + "-DLQ",
+               SubscriptionName:            subName,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+       })
+       assert.Nil(t, err)
+       defer dlqConsumer.Close()
+
+       dlqReceived := 0
+       for dlqReceived < N {
+               msg, err := dlqConsumer.Receive(ctx)
+               assert.Nil(t, err)
+               dlqConsumer.Ack(msg)
+               dlqReceived++
+       }
+       fmt.Println("dlq received:", dlqReceived) // 100
+
+       // No more messages on the DLQ Topic
+       dlqCtx, dlqCancel := context.WithTimeout(context.Background(), 
500*time.Millisecond)
+       defer dlqCancel()
+       msg, err = dlqConsumer.Receive(dlqCtx)
+       assert.Error(t, err)
+       assert.Nil(t, msg)
+
+       // 4. No more messages for same subscription
+       checkConsumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                       topic,
+               SubscriptionName:            subName,
+               Type:                        Shared,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+       })
+       assert.Nil(t, err)
+       defer checkConsumer.Close()
+
+       checkCtx, checkCancel := context.WithTimeout(context.Background(), 
500*time.Millisecond)
+       defer checkCancel()
+       checkMsg, err := checkConsumer.Receive(checkCtx)
+       assert.Error(t, err)
+       assert.Nil(t, checkMsg)
+}
+
+func TestRLQMultiTopics(t *testing.T) {
+       now := time.Now().Unix()
+       topic01 := fmt.Sprintf("persistent://public/default/topic-%d-1", now)
+       topic02 := fmt.Sprintf("persistent://public/default/topic-%d-2", now)
+       topics := []string{topic01, topic02}
+
+       subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
+       maxRedeliveries := 2
+       N := 100
+       ctx := context.Background()
+
+       client, err := NewClient(ClientOptions{URL: lookupURL})
+       assert.Nil(t, err)
+       defer client.Close()
+
+       // subscribe multi topics with Retry Topics
+       rlqConsumer, err := client.Subscribe(ConsumerOptions{
+               Topics:                      topics,
+               SubscriptionName:            subName,
+               Type:                        Shared,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+               DLQ:                         &DLQPolicy{MaxDeliveries: 
uint32(maxRedeliveries)},
+               RetryEnable:                 true,
+               NackRedeliveryDelay:         1 * time.Second,
+       })
+       assert.Nil(t, err)
+       defer rlqConsumer.Close()
+
+       // subscribe DLQ Topic
+       dlqConsumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                       "persistent://public/default/" + 
subName + "-DLQ",
+               SubscriptionName:            subName,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+       })
+       assert.Nil(t, err)
+       defer dlqConsumer.Close()
+
+       // create multi producers
+       producer01, err := client.CreateProducer(ProducerOptions{Topic: 
topic01})
+       assert.Nil(t, err)
+       defer producer01.Close()
+
+       producer02, err := client.CreateProducer(ProducerOptions{Topic: 
topic02})
+       assert.Nil(t, err)
+       defer producer02.Close()
+
+       // 1. Pre-produce N messages for every topic
+       for i := 0; i < N; i++ {
+               _, err = producer01.Send(ctx, &ProducerMessage{Payload: 
[]byte(fmt.Sprintf("MSG_01_%d", i))})
+               assert.Nil(t, err)
+               _, err = producer02.Send(ctx, &ProducerMessage{Payload: 
[]byte(fmt.Sprintf("MSG_02_%d", i))})
+               assert.Nil(t, err)
+       }
+
+       // 2. Create consumer on the Retry Topics to reconsume 2*N messages 
(maxRedeliveries+1) times
+       rlqReceived := 0
+       for rlqReceived < 2*N*(maxRedeliveries+1) {
+               msg, err := rlqConsumer.Receive(ctx)
+               assert.Nil(t, err)
+               rlqConsumer.ReconsumeLater(msg, 1*time.Second)
+               rlqReceived++
+       }
+       fmt.Println("retry consumed:", rlqReceived) // 600
+
+       // No more messages on the Retry Topic
+       rlqCtx, rlqCancel := context.WithTimeout(context.Background(), 
500*time.Millisecond)
+       defer rlqCancel()
+       msg, err := rlqConsumer.Receive(rlqCtx)
+       assert.Error(t, err)
+       assert.Nil(t, msg)
+
+       // 3. Create consumer on the DLQ topic to verify the routing
+       dlqReceived := 0
+       for dlqReceived < 2*N {
+               msg, err := dlqConsumer.Receive(ctx)
+               assert.Nil(t, err)
+               dlqConsumer.Ack(msg)
+               dlqReceived++
+       }
+       fmt.Println("dlq received:", dlqReceived) // 200
+
+       // No more messages on the DLQ Topic
+       dlqCtx, dlqCancel := context.WithTimeout(context.Background(), 
500*time.Millisecond)
+       defer dlqCancel()
+       msg, err = dlqConsumer.Receive(dlqCtx)
+       assert.Error(t, err)
+       assert.Nil(t, msg)
+
+       // 4. No more messages for same subscription
+       checkConsumer, err := client.Subscribe(ConsumerOptions{
+               Topics:                      []string{topic01, topic02},
+               SubscriptionName:            subName,
+               Type:                        Shared,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+       })
+       assert.Nil(t, err)
+       defer checkConsumer.Close()
+
+       timeoutCtx, cancel := context.WithTimeout(context.Background(), 
500*time.Millisecond)
+       defer cancel()
+       checkMsg, err := checkConsumer.Receive(timeoutCtx)
+       assert.Error(t, err)
+       assert.Nil(t, checkMsg)
+}
+
 func TestGetDeliveryCount(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index d6e7b30..68b263b 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -46,13 +46,13 @@ func newDlqRouter(client Client, policy *DLQPolicy) 
(*dlqRouter, error) {
                        return nil, errors.New("DLQPolicy.MaxDeliveries needs 
to be > 0")
                }
 
-               if policy.Topic == "" {
+               if policy.DeadLetterTopic == "" {
                        return nil, errors.New("DLQPolicy.Topic needs to be set 
to a valid topic name")
                }
 
                r.messageCh = make(chan ConsumerMessage)
                r.closeCh = make(chan interface{}, 1)
-               r.log = log.WithField("dlq-topic", policy.Topic)
+               r.log = log.WithField("dlq-topic", policy.DeadLetterTopic)
                go r.run()
        }
        return r, nil
@@ -132,7 +132,7 @@ func (r *dlqRouter) getProducer() Producer {
        backoff := &internal.Backoff{}
        for {
                producer, err := r.client.CreateProducer(ProducerOptions{
-                       Topic:                   r.policy.Topic,
+                       Topic:                   r.policy.DeadLetterTopic,
                        CompressionType:         LZ4,
                        BatchingMaxPublishDelay: 100 * time.Millisecond,
                })
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index d670a37..d796969 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -18,6 +18,7 @@
 package pulsar
 
 import (
+       "fmt"
        "math/big"
        "strings"
        "sync"
@@ -103,6 +104,10 @@ func (id messageID) Serialize() []byte {
        return data
 }
 
+func (id messageID) String() string {
+       return fmt.Sprintf("%d:%d:%d", id.ledgerID, id.entryID, id.partitionIdx)
+}
+
 func deserializeMessageID(data []byte) (MessageID, error) {
        msgID := &pb.MessageIdData{}
        err := proto.Unmarshal(data, msgID)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 1bfec52..0d2baab 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -176,6 +176,7 @@ type connection struct {
        closeCh            chan interface{}
        writeRequestsCh    chan Buffer
 
+       pendingLock sync.Mutex
        pendingReqs map[uint64]*request
        listeners   map[uint64]ConnectionListener
 
@@ -356,24 +357,35 @@ func (c *connection) run() {
        defer func() {
                // all the accesses to the pendingReqs should be happened in 
this run loop thread,
                // including the final cleanup, to avoid the issue 
https://github.com/apache/pulsar-client-go/issues/239
+               c.pendingLock.Lock()
                for id, req := range c.pendingReqs {
                        req.callback(nil, errors.New("connection closed"))
                        delete(c.pendingReqs, id)
                }
+               c.pendingLock.Unlock()
                c.Close()
        }()
 
+       go func() {
+               for {
+                       select {
+                       case <-c.closeCh:
+                               return
+
+                       case req := <-c.incomingRequestsCh:
+                               if req == nil {
+                                       return // TODO: this never gonna be 
happen
+                               }
+                               c.internalSendRequest(req)
+                       }
+               }
+       }()
+
        for {
                select {
                case <-c.closeCh:
                        return
 
-               case req := <-c.incomingRequestsCh:
-                       if req == nil {
-                               return
-                       }
-                       c.internalSendRequest(req)
-
                case cmd := <-c.incomingCmdCh:
                        c.internalReceivedCommand(cmd.cmd, 
cmd.headersAndPayload)
 
@@ -556,33 +568,41 @@ func (c *connection) SendRequestNoWait(req 
*pb.BaseCommand) {
 }
 
 func (c *connection) internalSendRequest(req *request) {
+       c.pendingLock.Lock()
        if req.id != nil {
                c.pendingReqs[*req.id] = req
        }
+       c.pendingLock.Unlock()
        c.writeCommand(req.cmd)
 }
 
 func (c *connection) handleResponse(requestID uint64, response 
*pb.BaseCommand) {
+       c.pendingLock.Lock()
        request, ok := c.pendingReqs[requestID]
        if !ok {
                c.log.Warnf("Received unexpected response for request %d of 
type %s", requestID, response.Type)
+               c.pendingLock.Unlock()
                return
        }
 
        delete(c.pendingReqs, requestID)
+       c.pendingLock.Unlock()
        request.callback(response, nil)
 }
 
 func (c *connection) handleResponseError(serverError *pb.CommandError) {
        requestID := serverError.GetRequestId()
+       c.pendingLock.Lock()
        request, ok := c.pendingReqs[requestID]
        if !ok {
                c.log.Warnf("Received unexpected error response for request %d 
of type %s",
                        requestID, serverError.GetError())
+               c.pendingLock.Unlock()
                return
        }
 
        delete(c.pendingReqs, requestID)
+       c.pendingLock.Unlock()
 
        errMsg := fmt.Sprintf("server error: %s: %s", serverError.GetError(), 
serverError.GetMessage())
        request.callback(nil, errors.New(errMsg))
diff --git a/pulsar/internal/topic_name.go b/pulsar/internal/topic_name.go
index 78d2abb..68b7ea1 100644
--- a/pulsar/internal/topic_name.go
+++ b/pulsar/internal/topic_name.go
@@ -26,8 +26,9 @@ import (
 
 // TopicName abstract a struct contained in a Topic
 type TopicName struct {
-       Name      string
+       Domain    string
        Namespace string
+       Name      string
        Partition int
 }
 
@@ -68,6 +69,7 @@ func ParseTopicName(topic string) (*TopicName, error) {
        if domain != "persistent" && domain != "non-persistent" {
                return nil, errors.New("Invalid topic domain: " + domain)
        }
+       tn.Domain = domain
 
        rest := parts[1]
        var err error
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 2a2b63e..458d2b5 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -286,8 +286,8 @@ func (p *producer) Flush() error {
 }
 
 func (p *producer) Close() {
-       p.RLock()
-       defer p.RUnlock()
+       p.Lock()
+       defer p.Unlock()
        if p.ticker != nil {
                p.ticker.Stop()
                close(p.tickerStop)
diff --git a/pulsar/dlq_router.go b/pulsar/retry_router.go
similarity index 50%
copy from pulsar/dlq_router.go
copy to pulsar/retry_router.go
index d6e7b30..b16417d 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/retry_router.go
@@ -26,95 +26,84 @@ import (
        log "github.com/sirupsen/logrus"
 )
 
-type dlqRouter struct {
+const (
+       DlqTopicSuffix    = "-DLQ"
+       RetryTopicSuffix  = "-RETRY"
+       MaxReconsumeTimes = 16
+
+       SysPropertyDelayTime       = "DELAY_TIME"
+       SysPropertyRealTopic       = "REAL_TOPIC"
+       SysPropertyRetryTopic      = "RETRY_TOPIC"
+       SysPropertyReconsumeTimes  = "RECONSUMETIMES"
+       SysPropertyOriginMessageID = "ORIGIN_MESSAGE_IDY_TIME"
+)
+
+type RetryMessage struct {
+       producerMsg ProducerMessage
+       consumerMsg ConsumerMessage
+}
+
+type retryRouter struct {
        client    Client
        producer  Producer
        policy    *DLQPolicy
-       messageCh chan ConsumerMessage
+       messageCh chan RetryMessage
        closeCh   chan interface{}
        log       *log.Entry
 }
 
-func newDlqRouter(client Client, policy *DLQPolicy) (*dlqRouter, error) {
-       r := &dlqRouter{
+func newRetryRouter(client Client, policy *DLQPolicy, retryEnabled bool) 
(*retryRouter, error) {
+       r := &retryRouter{
                client: client,
                policy: policy,
        }
 
-       if policy != nil {
+       if policy != nil && retryEnabled {
                if policy.MaxDeliveries <= 0 {
                        return nil, errors.New("DLQPolicy.MaxDeliveries needs 
to be > 0")
                }
 
-               if policy.Topic == "" {
-                       return nil, errors.New("DLQPolicy.Topic needs to be set 
to a valid topic name")
+               if policy.RetryLetterTopic == "" {
+                       return nil, errors.New("DLQPolicy.RetryLetterTopic 
needs to be set to a valid topic name")
                }
 
-               r.messageCh = make(chan ConsumerMessage)
+               r.messageCh = make(chan RetryMessage)
                r.closeCh = make(chan interface{}, 1)
-               r.log = log.WithField("dlq-topic", policy.Topic)
+               r.log = log.WithField("rlq-topic", policy.RetryLetterTopic)
                go r.run()
        }
        return r, nil
 }
 
-func (r *dlqRouter) shouldSendToDlq(cm *ConsumerMessage) bool {
-       if r.policy == nil {
-               return false
-       }
-
-       msg := cm.Message.(*message)
-       r.log.WithField("count", msg.redeliveryCount).
-               WithField("max", r.policy.MaxDeliveries).
-               WithField("msgId", msg.msgID).
-               Debug("Should route to DLQ?")
-
-       // We use >= here because we're comparing the number of re-deliveries 
with
-       // the number of deliveries. So:
-       //  * the user specifies that wants to process a message up to 10 times.
-       //  * the first time, the redeliveryCount == 0, then 1 and so on
-       //  * when we receive the message and redeliveryCount == 10, it means
-       //    that the application has already got (and Nack())  the message 10
-       //    times, so this time we should just go to DLQ.
-
-       return msg.redeliveryCount >= r.policy.MaxDeliveries
-}
-
-func (r *dlqRouter) Chan() chan ConsumerMessage {
+func (r *retryRouter) Chan() chan RetryMessage {
        return r.messageCh
 }
 
-func (r *dlqRouter) run() {
+func (r *retryRouter) run() {
        for {
                select {
-               case cm := <-r.messageCh:
-                       r.log.WithField("msgID", cm.ID()).Debug("Got message 
for DLQ")
+               case rm := <-r.messageCh:
+                       r.log.WithField("msgID", 
rm.consumerMsg.ID()).Debug("Got message for RLQ")
                        producer := r.getProducer()
 
-                       msg := cm.Message.(*message)
-                       msgID := msg.ID()
-                       producer.SendAsync(context.Background(), 
&ProducerMessage{
-                               Payload:             msg.Payload(),
-                               Key:                 msg.Key(),
-                               Properties:          msg.Properties(),
-                               EventTime:           msg.EventTime(),
-                               ReplicationClusters: msg.replicationClusters,
-                       }, func(MessageID, *ProducerMessage, error) {
-                               r.log.WithField("msgID", msgID).Debug("Sent 
message to DLQ")
-                               cm.Consumer.AckID(msgID)
+                       msgID := rm.consumerMsg.ID()
+                       producer.SendAsync(context.Background(), 
&rm.producerMsg, func(MessageID, *ProducerMessage, error) {
+                               // TODO: if router produce failed, should Nack 
this message
+                               r.log.WithField("msgID", msgID).Debug("Sent 
message to RLQ")
+                               rm.consumerMsg.Consumer.AckID(msgID)
                        })
 
                case <-r.closeCh:
                        if r.producer != nil {
                                r.producer.Close()
                        }
-                       r.log.Debug("Closed DLQ router")
+                       r.log.Debug("Closed RLQ router")
                        return
                }
        }
 }
 
-func (r *dlqRouter) close() {
+func (r *retryRouter) close() {
        // Attempt to write on the close channel, without blocking
        select {
        case r.closeCh <- nil:
@@ -122,7 +111,7 @@ func (r *dlqRouter) close() {
        }
 }
 
-func (r *dlqRouter) getProducer() Producer {
+func (r *retryRouter) getProducer() Producer {
        if r.producer != nil {
                // Producer was already initialized
                return r.producer
@@ -132,13 +121,13 @@ func (r *dlqRouter) getProducer() Producer {
        backoff := &internal.Backoff{}
        for {
                producer, err := r.client.CreateProducer(ProducerOptions{
-                       Topic:                   r.policy.Topic,
+                       Topic:                   r.policy.RetryLetterTopic,
                        CompressionType:         LZ4,
                        BatchingMaxPublishDelay: 100 * time.Millisecond,
                })
 
                if err != nil {
-                       r.log.WithError(err).Error("Failed to create DLQ 
producer")
+                       r.log.WithError(err).Error("Failed to create RLQ 
producer")
                        time.Sleep(backoff.Next())
                        continue
                } else {

Reply via email to