This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 567263f  Support nack backoff policy for SDK (#660)
567263f is described below

commit 567263ff5b077e9a737f96c9523646225502f141
Author: xiaolong ran <[email protected]>
AuthorDate: Mon Nov 8 12:42:03 2021 +0800

    Support nack backoff policy for SDK (#660)
    
    * Support nack backoff policy for SDK
    
    Signed-off-by: xiaolongran <[email protected]>
    
    * add test case and fix some logic
    
    Signed-off-by: xiaolongran <[email protected]>
    
    * fix action ciu
    
    Signed-off-by: xiaolongran <[email protected]>
    
    * fix action ci
    
    Signed-off-by: xiaolongran <[email protected]>
    
    * fix some logic
    
    Signed-off-by: xiaolongran <[email protected]>
    
    * fix data race
    
    Signed-off-by: xiaolongran <[email protected]>
    
    * fix data race
    
    Signed-off-by: xiaolongran <[email protected]>
    
    * fix a little
    
    Signed-off-by: xiaolongran <[email protected]>
    
    * fix data race
    
    Signed-off-by: xiaolongran <[email protected]>
    
    * fix data race
    
    Signed-off-by: xiaolongran <[email protected]>
    
    * fix some comments
    
    Signed-off-by: xiaolongran <[email protected]>
    
    * fix test case
    
    Signed-off-by: xiaolongran <[email protected]>
---
 pulsar/consumer.go                     |  11 +++
 pulsar/consumer_impl.go                |  20 +++++
 pulsar/consumer_multitopic.go          |  16 ++++
 pulsar/consumer_partition.go           |   8 +-
 pulsar/consumer_regex.go               |  16 ++++
 pulsar/impl_message.go                 |   7 ++
 pulsar/negative_acks_tracker.go        |  44 +++++++++-
 pulsar/negative_acks_tracker_test.go   | 156 ++++++++++++++++++++++++++++++++-
 pulsar/negative_backoff_policy.go      |  46 ++++++++++
 pulsar/negative_backoff_policy_test.go |  34 +++++++
 10 files changed, 351 insertions(+), 7 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index c9fbc0d..5127955 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -158,6 +158,17 @@ type ConsumerOptions struct {
 
        // Decryption decryption related fields to decrypt the encrypted message
        Decryption *MessageDecryptionInfo
+
+       // If enabled, the default implementation of NackBackoffPolicy will be 
used to calculate the delay time of
+       // nack backoff, Default: false.
+       EnableDefaultNackBackoffPolicy bool
+
+       // NackBackoffPolicy is a redelivery backoff mechanism which we can 
achieve redelivery with different
+       // delays according to the number of times the message is retried.
+       //
+       // > Notice: the NackBackoffPolicy will not work with 
`consumer.NackID(MessageID)`
+       // > because we are not able to get the redeliveryCount from the 
message ID.
+       NackBackoffPolicy NackBackoffPolicy
 }
 
 // Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 6aef497..1bf75b5 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -35,6 +35,7 @@ const defaultNackRedeliveryDelay = 1 * time.Minute
 type acker interface {
        AckID(id trackingMessageID)
        NackID(id trackingMessageID)
+       NackMsg(msg Message)
 }
 
 type consumer struct {
@@ -87,6 +88,10 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                }
        }
 
+       if options.NackBackoffPolicy == nil && 
options.EnableDefaultNackBackoffPolicy {
+               options.NackBackoffPolicy = new(defaultNackBackoffPolicy)
+       }
+
        // did the user pass in a message channel?
        messageCh := options.MessageChannel
        if options.MessageChannel == nil {
@@ -326,6 +331,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
                                partitionIdx:               idx,
                                receiverQueueSize:          receiverQueueSize,
                                nackRedeliveryDelay:        nackRedeliveryDelay,
+                               nackBackoffPolicy:          
c.options.NackBackoffPolicy,
                                metadata:                   metadata,
                                replicateSubscriptionState: 
c.options.ReplicateSubscriptionState,
                                startMessageID:             trackingMessageID{},
@@ -489,6 +495,20 @@ func (c *consumer) ReconsumeLater(msg Message, delay 
time.Duration) {
 }
 
 func (c *consumer) Nack(msg Message) {
+       if c.options.EnableDefaultNackBackoffPolicy || 
c.options.NackBackoffPolicy != nil {
+               mid, ok := c.messageID(msg.ID())
+               if !ok {
+                       return
+               }
+
+               if mid.consumer != nil {
+                       mid.Nack()
+                       return
+               }
+               c.consumers[mid.partitionIdx].NackMsg(msg)
+               return
+       }
+
        c.NackID(msg.ID())
 }
 
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index f689fae..c1cb3d8 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -164,6 +164,22 @@ func (c *multiTopicConsumer) ReconsumeLater(msg Message, 
delay time.Duration) {
 }
 
 func (c *multiTopicConsumer) Nack(msg Message) {
+       if c.options.EnableDefaultNackBackoffPolicy || 
c.options.NackBackoffPolicy != nil {
+               msgID := msg.ID()
+               mid, ok := toTrackingMessageID(msgID)
+               if !ok {
+                       c.log.Warnf("invalid message id type %T", msgID)
+                       return
+               }
+
+               if mid.consumer == nil {
+                       c.log.Warnf("unable to nack messageID=%+v can not 
determine topic", msgID)
+                       return
+               }
+               mid.NackByMsg(msg)
+               return
+       }
+
        c.NackID(msg.ID())
 }
 
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 30639bd..c8e5d9f 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -89,6 +89,7 @@ type partitionConsumerOpts struct {
        partitionIdx               int
        receiverQueueSize          int
        nackRedeliveryDelay        time.Duration
+       nackBackoffPolicy          NackBackoffPolicy
        metadata                   map[string]string
        replicateSubscriptionState bool
        startMessageID             trackingMessageID
@@ -201,7 +202,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
 
        pc.decryptor = decryptor
 
-       pc.nackTracker = newNegativeAcksTracker(pc, 
options.nackRedeliveryDelay, pc.log)
+       pc.nackTracker = newNegativeAcksTracker(pc, 
options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log)
 
        err := pc.grabConn()
        if err != nil {
@@ -331,6 +332,11 @@ func (pc *partitionConsumer) NackID(msgID 
trackingMessageID) {
        pc.metrics.NacksCounter.Inc()
 }
 
+func (pc *partitionConsumer) NackMsg(msg Message) {
+       pc.nackTracker.AddMessage(msg)
+       pc.metrics.NacksCounter.Inc()
+}
+
 func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
        pc.eventsCh <- &redeliveryRequest{msgIds}
 
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 2f46c48..ed2ae1a 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -183,6 +183,22 @@ func (c *regexConsumer) AckID(msgID MessageID) {
 }
 
 func (c *regexConsumer) Nack(msg Message) {
+       if c.options.EnableDefaultNackBackoffPolicy || 
c.options.NackBackoffPolicy != nil {
+               msgID := msg.ID()
+               mid, ok := toTrackingMessageID(msgID)
+               if !ok {
+                       c.log.Warnf("invalid message id type %T", msgID)
+                       return
+               }
+
+               if mid.consumer == nil {
+                       c.log.Warnf("unable to nack messageID=%+v can not 
determine topic", msgID)
+                       return
+               }
+               mid.NackByMsg(msg)
+               return
+       }
+
        c.NackID(msg.ID())
 }
 
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 19fa6d8..a9809aa 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -79,6 +79,13 @@ func (id trackingMessageID) Nack() {
        id.consumer.NackID(id)
 }
 
+func (id trackingMessageID) NackByMsg(msg Message) {
+       if id.consumer == nil {
+               return
+       }
+       id.consumer.NackMsg(msg)
+}
+
 func (id trackingMessageID) ack() bool {
        if id.tracker != nil && id.batchIdx > -1 {
                return id.tracker.ack(int(id.batchIdx))
diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go
index e10ab49..3485e1b 100644
--- a/pulsar/negative_acks_tracker.go
+++ b/pulsar/negative_acks_tracker.go
@@ -35,21 +35,32 @@ type negativeAcksTracker struct {
        doneOnce     sync.Once
        negativeAcks map[messageID]time.Time
        rc           redeliveryConsumer
+       nackBackoff  NackBackoffPolicy
        tick         *time.Ticker
        delay        time.Duration
        log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger 
log.Logger) *negativeAcksTracker {
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+       nackBackoffPolicy NackBackoffPolicy, logger log.Logger) 
*negativeAcksTracker {
+
        t := &negativeAcksTracker{
                doneCh:       make(chan interface{}),
                negativeAcks: make(map[messageID]time.Time),
                rc:           rc,
-               tick:         time.NewTicker(delay / 3),
-               delay:        delay,
+               nackBackoff:  nackBackoffPolicy,
                log:          logger,
        }
 
+       if nackBackoffPolicy != nil {
+               firstDelayForNackBackoff := nackBackoffPolicy.Next(1)
+               t.delay = time.Duration(firstDelayForNackBackoff)
+       } else {
+               t.delay = delay
+       }
+
+       t.tick = time.NewTicker(t.delay / 3)
+
        go t.track()
        return t
 }
@@ -76,6 +87,32 @@ func (t *negativeAcksTracker) Add(msgID messageID) {
        t.negativeAcks[batchMsgID] = targetTime
 }
 
+func (t *negativeAcksTracker) AddMessage(msg Message) {
+       nackBackoffDelay := t.nackBackoff.Next(msg.RedeliveryCount())
+
+       msgID := msg.ID()
+
+       // Always clear up the batch index since we want to track the nack
+       // for the entire batch
+       batchMsgID := messageID{
+               ledgerID: msgID.LedgerID(),
+               entryID:  msgID.EntryID(),
+               batchIdx: 0,
+       }
+
+       t.Lock()
+       defer t.Unlock()
+
+       _, present := t.negativeAcks[batchMsgID]
+       if present {
+               // The batch is already being tracked
+               return
+       }
+
+       targetTime := time.Now().Add(time.Duration(nackBackoffDelay))
+       t.negativeAcks[batchMsgID] = targetTime
+}
+
 func (t *negativeAcksTracker) track() {
        for {
                select {
@@ -105,7 +142,6 @@ func (t *negativeAcksTracker) track() {
                                        t.rc.Redeliver(msgIds)
                                }
                        }
-
                }
        }
 }
diff --git a/pulsar/negative_acks_tracker_test.go 
b/pulsar/negative_acks_tracker_test.go
index e587f3f..51965ea 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -75,7 +75,7 @@ func (nmc *nackMockedConsumer) Wait() <-chan messageID {
 
 func TestNacksTracker(t *testing.T) {
        nmc := newNackMockedConsumer()
-       nacks := newNegativeAcksTracker(nmc, testNackDelay, 
log.DefaultNopLogger())
+       nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, 
log.DefaultNopLogger())
 
        nacks.Add(messageID{
                ledgerID: 1,
@@ -108,7 +108,7 @@ func TestNacksTracker(t *testing.T) {
 
 func TestNacksWithBatchesTracker(t *testing.T) {
        nmc := newNackMockedConsumer()
-       nacks := newNegativeAcksTracker(nmc, testNackDelay, 
log.DefaultNopLogger())
+       nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, 
log.DefaultNopLogger())
 
        nacks.Add(messageID{
                ledgerID: 1,
@@ -148,3 +148,155 @@ func TestNacksWithBatchesTracker(t *testing.T) {
 
        nacks.Close()
 }
+
+func TestNackBackoffTracker(t *testing.T) {
+       nmc := newNackMockedConsumer()
+       nacks := newNegativeAcksTracker(nmc, testNackDelay, 
new(defaultNackBackoffPolicy), log.DefaultNopLogger())
+
+       nacks.AddMessage(new(mockMessage1))
+       nacks.AddMessage(new(mockMessage2))
+
+       msgIds := make([]messageID, 0)
+       for id := range nmc.Wait() {
+               msgIds = append(msgIds, id)
+       }
+       msgIds = sortMessageIds(msgIds)
+
+       assert.Equal(t, 2, len(msgIds))
+       assert.Equal(t, int64(1), msgIds[0].ledgerID)
+       assert.Equal(t, int64(1), msgIds[0].entryID)
+       assert.Equal(t, int64(2), msgIds[1].ledgerID)
+       assert.Equal(t, int64(2), msgIds[1].entryID)
+
+       nacks.Close()
+       // allow multiple Close without panicing
+       nacks.Close()
+}
+
+type mockMessage1 struct {
+       properties map[string]string
+}
+
+func (msg *mockMessage1) Topic() string {
+       return ""
+}
+
+func (msg *mockMessage1) Properties() map[string]string {
+       return msg.properties
+}
+
+func (msg *mockMessage1) Payload() []byte {
+       return nil
+}
+
+func (msg *mockMessage1) ID() MessageID {
+       return messageID{
+               ledgerID: 1,
+               entryID:  1,
+               batchIdx: 1,
+       }
+}
+
+func (msg *mockMessage1) PublishTime() time.Time {
+       return time.Time{}
+}
+
+func (msg *mockMessage1) EventTime() time.Time {
+       return time.Time{}
+}
+
+func (msg *mockMessage1) Key() string {
+       return ""
+}
+
+func (msg *mockMessage1) OrderingKey() string {
+       return ""
+}
+
+func (msg *mockMessage1) RedeliveryCount() uint32 {
+       return 0
+}
+
+func (msg *mockMessage1) IsReplicated() bool {
+       return false
+}
+
+func (msg *mockMessage1) GetReplicatedFrom() string {
+       return ""
+}
+
+func (msg *mockMessage1) GetSchemaValue(v interface{}) error {
+       return nil
+}
+
+func (msg *mockMessage1) ProducerName() string {
+       return ""
+}
+
+func (msg *mockMessage1) GetEncryptionContext() *EncryptionContext {
+       return &EncryptionContext{}
+}
+
+type mockMessage2 struct {
+       properties map[string]string
+}
+
+func (msg *mockMessage2) Topic() string {
+       return ""
+}
+
+func (msg *mockMessage2) Properties() map[string]string {
+       return msg.properties
+}
+
+func (msg *mockMessage2) Payload() []byte {
+       return nil
+}
+
+func (msg *mockMessage2) ID() MessageID {
+       return messageID{
+               ledgerID: 2,
+               entryID:  2,
+               batchIdx: 1,
+       }
+}
+
+func (msg *mockMessage2) PublishTime() time.Time {
+       return time.Time{}
+}
+
+func (msg *mockMessage2) EventTime() time.Time {
+       return time.Time{}
+}
+
+func (msg *mockMessage2) Key() string {
+       return ""
+}
+
+func (msg *mockMessage2) OrderingKey() string {
+       return ""
+}
+
+func (msg *mockMessage2) RedeliveryCount() uint32 {
+       return 0
+}
+
+func (msg *mockMessage2) IsReplicated() bool {
+       return false
+}
+
+func (msg *mockMessage2) GetReplicatedFrom() string {
+       return ""
+}
+
+func (msg *mockMessage2) GetSchemaValue(v interface{}) error {
+       return nil
+}
+
+func (msg *mockMessage2) ProducerName() string {
+       return ""
+}
+
+func (msg *mockMessage2) GetEncryptionContext() *EncryptionContext {
+       return &EncryptionContext{}
+}
diff --git a/pulsar/negative_backoff_policy.go 
b/pulsar/negative_backoff_policy.go
new file mode 100644
index 0000000..cf080ad
--- /dev/null
+++ b/pulsar/negative_backoff_policy.go
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy, 
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked 
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier 
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+       // The redeliveryCount indicates the number of times the message was 
redelivered.
+       // We can get the redeliveryCount from the CommandMessage.
+       Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+       minNackTimeMs := int64(1000 * 30) // 30sec
+       maxNackTimeMs := 1000 * 60 * 10   // 10min
+
+       if redeliveryCount < 0 {
+               return minNackTimeMs
+       }
+
+       return 
int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), 
float64(maxNackTimeMs)))
+}
diff --git a/pulsar/negative_backoff_policy_test.go 
b/pulsar/negative_backoff_policy_test.go
new file mode 100644
index 0000000..bfbb6a8
--- /dev/null
+++ b/pulsar/negative_backoff_policy_test.go
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestDefaultNackBackoffPolicy_Next(t *testing.T) {
+       defaultNackBackoff := new(defaultNackBackoffPolicy)
+
+       res0 := defaultNackBackoff.Next(0)
+       assert.Equal(t, int64(1000*30), res0)
+
+       res5 := defaultNackBackoff.Next(5)
+       assert.Equal(t, int64(600000), res5)
+}

Reply via email to