This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new e9629fb  [Issue 615] Fix retry on specified-partition topic (#616)
e9629fb is described below

commit e9629fb7851a078c8746940b45134b136c84a0a6
Author: Minzhang <[email protected]>
AuthorDate: Sat Oct 9 16:43:43 2021 +0800

    [Issue 615] Fix retry on specified-partition topic (#616)
    
    * fix retry on consume specified-partition topic  (#615)
    
    * introduce variable for topic name
    
    Co-authored-by: xiaolongran <[email protected]>
---
 pulsar/consumer_multitopic.go |  11 +++--
 pulsar/consumer_test.go       | 101 ++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 109 insertions(+), 3 deletions(-)

diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index faf8917..f689fae 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -149,11 +149,16 @@ func (c *multiTopicConsumer) ReconsumeLater(msg Message, 
delay time.Duration) {
                return
        }
 
-       fqdnTopic := internal.TopicNameWithoutPartitionPart(names[0])
+       tn := names[0]
+       fqdnTopic := internal.TopicNameWithoutPartitionPart(tn)
        consumer, ok := c.consumers[fqdnTopic]
        if !ok {
-               c.log.Warnf("consumer of topic %s not exist unexpectedly", 
msg.Topic())
-               return
+               // check to see if the topic with the partition part is in the 
consumers
+               // this can happen when the consumer is configured to consume 
from a specific partition
+               if consumer, ok = c.consumers[tn.Name]; !ok {
+                       c.log.Warnf("consumer of topic %s not exist 
unexpectedly", msg.Topic())
+                       return
+               }
        }
        consumer.ReconsumeLater(msg, delay)
 }
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 1811722..66587f9 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1353,6 +1353,107 @@ func TestRLQMultiTopics(t *testing.T) {
        assert.Nil(t, checkMsg)
 }
 
+func TestRLQSpecifiedPartitionTopic(t *testing.T) {
+       topic := newTopicName()
+       testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + 
topic + "/partitions"
+       makeHTTPCall(t, http.MethodPut, testURL, "1")
+
+       normalTopic := "persistent://public/default/" + topic
+       partitionTopic := normalTopic + "-partition-0"
+
+       subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
+       maxRedeliveries := 2
+       N := 100
+       ctx := context.Background()
+
+       client, err := NewClient(ClientOptions{URL: lookupURL})
+       assert.Nil(t, err)
+       defer client.Close()
+
+       // subscribe topic with partition
+       rlqConsumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                       partitionTopic,
+               SubscriptionName:            subName,
+               Type:                        Shared,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+               DLQ:                         &DLQPolicy{MaxDeliveries: 
uint32(maxRedeliveries)},
+               RetryEnable:                 true,
+               NackRedeliveryDelay:         1 * time.Second,
+       })
+       assert.Nil(t, err)
+       defer rlqConsumer.Close()
+
+       // subscribe DLQ Topic
+       dlqConsumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                       subName + "-DLQ",
+               SubscriptionName:            subName,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+       })
+       assert.Nil(t, err)
+       defer dlqConsumer.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{Topic: 
normalTopic})
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // 1. Pre-produce N messages
+       for i := 0; i < N; i++ {
+               _, err = producer.Send(ctx, &ProducerMessage{Payload: 
[]byte(fmt.Sprintf("MSG_01_%d", i))})
+               assert.Nil(t, err)
+       }
+
+       // 2. Create consumer on the Retry Topics to reconsume N messages 
(maxRedeliveries+1) times
+       rlqReceived := 0
+       for rlqReceived < N*(maxRedeliveries+1) {
+               msg, err := rlqConsumer.Receive(ctx)
+               assert.Nil(t, err)
+               rlqConsumer.ReconsumeLater(msg, 1*time.Second)
+               rlqReceived++
+       }
+       fmt.Println("retry consumed:", rlqReceived) // 300
+
+       // No more messages on the Retry Topic
+       rlqCtx, rlqCancel := context.WithTimeout(context.Background(), 
500*time.Millisecond)
+       defer rlqCancel()
+       msg, err := rlqConsumer.Receive(rlqCtx)
+       assert.Error(t, err)
+       assert.Nil(t, msg)
+
+       // 3. Create consumer on the DLQ topic to verify the routing
+       dlqReceived := 0
+       for dlqReceived < N {
+               msg, err := dlqConsumer.Receive(ctx)
+               assert.Nil(t, err)
+               dlqConsumer.Ack(msg)
+               dlqReceived++
+       }
+       fmt.Println("dlq received:", dlqReceived) // 100
+
+       // No more messages on the DLQ Topic
+       dlqCtx, dlqCancel := context.WithTimeout(context.Background(), 
500*time.Millisecond)
+       defer dlqCancel()
+       msg, err = dlqConsumer.Receive(dlqCtx)
+       assert.Error(t, err)
+       assert.Nil(t, msg)
+
+       // 4. No more messages for same subscription
+       checkConsumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                       normalTopic,
+               SubscriptionName:            subName,
+               Type:                        Shared,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+       })
+       assert.Nil(t, err)
+       defer checkConsumer.Close()
+
+       timeoutCtx, cancel := context.WithTimeout(context.Background(), 
500*time.Millisecond)
+       defer cancel()
+       checkMsg, err := checkConsumer.Receive(timeoutCtx)
+       assert.Error(t, err)
+       assert.Nil(t, checkMsg)
+}
+
 func TestGetDeliveryCount(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,

Reply via email to