This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new e9629fb [Issue 615] Fix retry on specified-partition topic (#616)
e9629fb is described below
commit e9629fb7851a078c8746940b45134b136c84a0a6
Author: Minzhang <[email protected]>
AuthorDate: Sat Oct 9 16:43:43 2021 +0800
[Issue 615] Fix retry on specified-partition topic (#616)
* fix retry on consume specified-partition topic (#615)
* introduce variable for topic name
Co-authored-by: xiaolongran <[email protected]>
---
pulsar/consumer_multitopic.go | 11 +++--
pulsar/consumer_test.go | 101 ++++++++++++++++++++++++++++++++++++++++++
2 files changed, 109 insertions(+), 3 deletions(-)
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index faf8917..f689fae 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -149,11 +149,16 @@ func (c *multiTopicConsumer) ReconsumeLater(msg Message,
delay time.Duration) {
return
}
- fqdnTopic := internal.TopicNameWithoutPartitionPart(names[0])
+ tn := names[0]
+ fqdnTopic := internal.TopicNameWithoutPartitionPart(tn)
consumer, ok := c.consumers[fqdnTopic]
if !ok {
- c.log.Warnf("consumer of topic %s not exist unexpectedly",
msg.Topic())
- return
+ // check to see if the topic with the partition part is in the
consumers
+ // this can happen when the consumer is configured to consume
from a specific partition
+ if consumer, ok = c.consumers[tn.Name]; !ok {
+ c.log.Warnf("consumer of topic %s not exist
unexpectedly", msg.Topic())
+ return
+ }
}
consumer.ReconsumeLater(msg, delay)
}
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 1811722..66587f9 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1353,6 +1353,107 @@ func TestRLQMultiTopics(t *testing.T) {
assert.Nil(t, checkMsg)
}
+func TestRLQSpecifiedPartitionTopic(t *testing.T) {
+ topic := newTopicName()
+ testURL := adminURL + "/" + "admin/v2/persistent/public/default/" +
topic + "/partitions"
+ makeHTTPCall(t, http.MethodPut, testURL, "1")
+
+ normalTopic := "persistent://public/default/" + topic
+ partitionTopic := normalTopic + "-partition-0"
+
+ subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
+ maxRedeliveries := 2
+ N := 100
+ ctx := context.Background()
+
+ client, err := NewClient(ClientOptions{URL: lookupURL})
+ assert.Nil(t, err)
+ defer client.Close()
+
+ // subscribe topic with partition
+ rlqConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: partitionTopic,
+ SubscriptionName: subName,
+ Type: Shared,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
+ DLQ: &DLQPolicy{MaxDeliveries:
uint32(maxRedeliveries)},
+ RetryEnable: true,
+ NackRedeliveryDelay: 1 * time.Second,
+ })
+ assert.Nil(t, err)
+ defer rlqConsumer.Close()
+
+ // subscribe DLQ Topic
+ dlqConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: subName + "-DLQ",
+ SubscriptionName: subName,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
+ })
+ assert.Nil(t, err)
+ defer dlqConsumer.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{Topic:
normalTopic})
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // 1. Pre-produce N messages
+ for i := 0; i < N; i++ {
+ _, err = producer.Send(ctx, &ProducerMessage{Payload:
[]byte(fmt.Sprintf("MSG_01_%d", i))})
+ assert.Nil(t, err)
+ }
+
+ // 2. Create consumer on the Retry Topics to reconsume N messages
(maxRedeliveries+1) times
+ rlqReceived := 0
+ for rlqReceived < N*(maxRedeliveries+1) {
+ msg, err := rlqConsumer.Receive(ctx)
+ assert.Nil(t, err)
+ rlqConsumer.ReconsumeLater(msg, 1*time.Second)
+ rlqReceived++
+ }
+ fmt.Println("retry consumed:", rlqReceived) // 300
+
+ // No more messages on the Retry Topic
+ rlqCtx, rlqCancel := context.WithTimeout(context.Background(),
500*time.Millisecond)
+ defer rlqCancel()
+ msg, err := rlqConsumer.Receive(rlqCtx)
+ assert.Error(t, err)
+ assert.Nil(t, msg)
+
+ // 3. Create consumer on the DLQ topic to verify the routing
+ dlqReceived := 0
+ for dlqReceived < N {
+ msg, err := dlqConsumer.Receive(ctx)
+ assert.Nil(t, err)
+ dlqConsumer.Ack(msg)
+ dlqReceived++
+ }
+ fmt.Println("dlq received:", dlqReceived) // 100
+
+ // No more messages on the DLQ Topic
+ dlqCtx, dlqCancel := context.WithTimeout(context.Background(),
500*time.Millisecond)
+ defer dlqCancel()
+ msg, err = dlqConsumer.Receive(dlqCtx)
+ assert.Error(t, err)
+ assert.Nil(t, msg)
+
+ // 4. No more messages for same subscription
+ checkConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: normalTopic,
+ SubscriptionName: subName,
+ Type: Shared,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
+ })
+ assert.Nil(t, err)
+ defer checkConsumer.Close()
+
+ timeoutCtx, cancel := context.WithTimeout(context.Background(),
500*time.Millisecond)
+ defer cancel()
+ checkMsg, err := checkConsumer.Receive(timeoutCtx)
+ assert.Error(t, err)
+ assert.Nil(t, checkMsg)
+}
+
func TestGetDeliveryCount(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,