This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 567263f Support nack backoff policy for SDK (#660)
567263f is described below
commit 567263ff5b077e9a737f96c9523646225502f141
Author: xiaolong ran <[email protected]>
AuthorDate: Mon Nov 8 12:42:03 2021 +0800
Support nack backoff policy for SDK (#660)
* Support nack backoff policy for SDK
Signed-off-by: xiaolongran <[email protected]>
* add test case and fix some logic
Signed-off-by: xiaolongran <[email protected]>
* fix action ciu
Signed-off-by: xiaolongran <[email protected]>
* fix action ci
Signed-off-by: xiaolongran <[email protected]>
* fix some logic
Signed-off-by: xiaolongran <[email protected]>
* fix data race
Signed-off-by: xiaolongran <[email protected]>
* fix data race
Signed-off-by: xiaolongran <[email protected]>
* fix a little
Signed-off-by: xiaolongran <[email protected]>
* fix data race
Signed-off-by: xiaolongran <[email protected]>
* fix data race
Signed-off-by: xiaolongran <[email protected]>
* fix some comments
Signed-off-by: xiaolongran <[email protected]>
* fix test case
Signed-off-by: xiaolongran <[email protected]>
---
pulsar/consumer.go | 11 +++
pulsar/consumer_impl.go | 20 +++++
pulsar/consumer_multitopic.go | 16 ++++
pulsar/consumer_partition.go | 8 +-
pulsar/consumer_regex.go | 16 ++++
pulsar/impl_message.go | 7 ++
pulsar/negative_acks_tracker.go | 44 +++++++++-
pulsar/negative_acks_tracker_test.go | 156 ++++++++++++++++++++++++++++++++-
pulsar/negative_backoff_policy.go | 46 ++++++++++
pulsar/negative_backoff_policy_test.go | 34 +++++++
10 files changed, 351 insertions(+), 7 deletions(-)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index c9fbc0d..5127955 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -158,6 +158,17 @@ type ConsumerOptions struct {
// Decryption decryption related fields to decrypt the encrypted message
Decryption *MessageDecryptionInfo
+
+ // If enabled, the default implementation of NackBackoffPolicy will be
used to calculate the delay time of
+ // nack backoff, Default: false.
+ EnableDefaultNackBackoffPolicy bool
+
+ // NackBackoffPolicy is a redelivery backoff mechanism which we can
achieve redelivery with different
+ // delays according to the number of times the message is retried.
+ //
+ // > Notice: the NackBackoffPolicy will not work with
`consumer.NackID(MessageID)`
+ // > because we are not able to get the redeliveryCount from the
message ID.
+ NackBackoffPolicy NackBackoffPolicy
}
// Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 6aef497..1bf75b5 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -35,6 +35,7 @@ const defaultNackRedeliveryDelay = 1 * time.Minute
type acker interface {
AckID(id trackingMessageID)
NackID(id trackingMessageID)
+ NackMsg(msg Message)
}
type consumer struct {
@@ -87,6 +88,10 @@ func newConsumer(client *client, options ConsumerOptions)
(Consumer, error) {
}
}
+ if options.NackBackoffPolicy == nil &&
options.EnableDefaultNackBackoffPolicy {
+ options.NackBackoffPolicy = new(defaultNackBackoffPolicy)
+ }
+
// did the user pass in a message channel?
messageCh := options.MessageChannel
if options.MessageChannel == nil {
@@ -326,6 +331,7 @@ func (c *consumer) internalTopicSubscribeToPartitions()
error {
partitionIdx: idx,
receiverQueueSize: receiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
+ nackBackoffPolicy:
c.options.NackBackoffPolicy,
metadata: metadata,
replicateSubscriptionState:
c.options.ReplicateSubscriptionState,
startMessageID: trackingMessageID{},
@@ -489,6 +495,20 @@ func (c *consumer) ReconsumeLater(msg Message, delay
time.Duration) {
}
func (c *consumer) Nack(msg Message) {
+ if c.options.EnableDefaultNackBackoffPolicy ||
c.options.NackBackoffPolicy != nil {
+ mid, ok := c.messageID(msg.ID())
+ if !ok {
+ return
+ }
+
+ if mid.consumer != nil {
+ mid.Nack()
+ return
+ }
+ c.consumers[mid.partitionIdx].NackMsg(msg)
+ return
+ }
+
c.NackID(msg.ID())
}
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index f689fae..c1cb3d8 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -164,6 +164,22 @@ func (c *multiTopicConsumer) ReconsumeLater(msg Message,
delay time.Duration) {
}
func (c *multiTopicConsumer) Nack(msg Message) {
+ if c.options.EnableDefaultNackBackoffPolicy ||
c.options.NackBackoffPolicy != nil {
+ msgID := msg.ID()
+ mid, ok := toTrackingMessageID(msgID)
+ if !ok {
+ c.log.Warnf("invalid message id type %T", msgID)
+ return
+ }
+
+ if mid.consumer == nil {
+ c.log.Warnf("unable to nack messageID=%+v can not
determine topic", msgID)
+ return
+ }
+ mid.NackByMsg(msg)
+ return
+ }
+
c.NackID(msg.ID())
}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 30639bd..c8e5d9f 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -89,6 +89,7 @@ type partitionConsumerOpts struct {
partitionIdx int
receiverQueueSize int
nackRedeliveryDelay time.Duration
+ nackBackoffPolicy NackBackoffPolicy
metadata map[string]string
replicateSubscriptionState bool
startMessageID trackingMessageID
@@ -201,7 +202,7 @@ func newPartitionConsumer(parent Consumer, client *client,
options *partitionCon
pc.decryptor = decryptor
- pc.nackTracker = newNegativeAcksTracker(pc,
options.nackRedeliveryDelay, pc.log)
+ pc.nackTracker = newNegativeAcksTracker(pc,
options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log)
err := pc.grabConn()
if err != nil {
@@ -331,6 +332,11 @@ func (pc *partitionConsumer) NackID(msgID
trackingMessageID) {
pc.metrics.NacksCounter.Inc()
}
+func (pc *partitionConsumer) NackMsg(msg Message) {
+ pc.nackTracker.AddMessage(msg)
+ pc.metrics.NacksCounter.Inc()
+}
+
func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
pc.eventsCh <- &redeliveryRequest{msgIds}
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 2f46c48..ed2ae1a 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -183,6 +183,22 @@ func (c *regexConsumer) AckID(msgID MessageID) {
}
func (c *regexConsumer) Nack(msg Message) {
+ if c.options.EnableDefaultNackBackoffPolicy ||
c.options.NackBackoffPolicy != nil {
+ msgID := msg.ID()
+ mid, ok := toTrackingMessageID(msgID)
+ if !ok {
+ c.log.Warnf("invalid message id type %T", msgID)
+ return
+ }
+
+ if mid.consumer == nil {
+ c.log.Warnf("unable to nack messageID=%+v can not
determine topic", msgID)
+ return
+ }
+ mid.NackByMsg(msg)
+ return
+ }
+
c.NackID(msg.ID())
}
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 19fa6d8..a9809aa 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -79,6 +79,13 @@ func (id trackingMessageID) Nack() {
id.consumer.NackID(id)
}
+func (id trackingMessageID) NackByMsg(msg Message) {
+ if id.consumer == nil {
+ return
+ }
+ id.consumer.NackMsg(msg)
+}
+
func (id trackingMessageID) ack() bool {
if id.tracker != nil && id.batchIdx > -1 {
return id.tracker.ack(int(id.batchIdx))
diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go
index e10ab49..3485e1b 100644
--- a/pulsar/negative_acks_tracker.go
+++ b/pulsar/negative_acks_tracker.go
@@ -35,21 +35,32 @@ type negativeAcksTracker struct {
doneOnce sync.Once
negativeAcks map[messageID]time.Time
rc redeliveryConsumer
+ nackBackoff NackBackoffPolicy
tick *time.Ticker
delay time.Duration
log log.Logger
}
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger
log.Logger) *negativeAcksTracker {
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
+ nackBackoffPolicy NackBackoffPolicy, logger log.Logger)
*negativeAcksTracker {
+
t := &negativeAcksTracker{
doneCh: make(chan interface{}),
negativeAcks: make(map[messageID]time.Time),
rc: rc,
- tick: time.NewTicker(delay / 3),
- delay: delay,
+ nackBackoff: nackBackoffPolicy,
log: logger,
}
+ if nackBackoffPolicy != nil {
+ firstDelayForNackBackoff := nackBackoffPolicy.Next(1)
+ t.delay = time.Duration(firstDelayForNackBackoff)
+ } else {
+ t.delay = delay
+ }
+
+ t.tick = time.NewTicker(t.delay / 3)
+
go t.track()
return t
}
@@ -76,6 +87,32 @@ func (t *negativeAcksTracker) Add(msgID messageID) {
t.negativeAcks[batchMsgID] = targetTime
}
+func (t *negativeAcksTracker) AddMessage(msg Message) {
+ nackBackoffDelay := t.nackBackoff.Next(msg.RedeliveryCount())
+
+ msgID := msg.ID()
+
+ // Always clear up the batch index since we want to track the nack
+ // for the entire batch
+ batchMsgID := messageID{
+ ledgerID: msgID.LedgerID(),
+ entryID: msgID.EntryID(),
+ batchIdx: 0,
+ }
+
+ t.Lock()
+ defer t.Unlock()
+
+ _, present := t.negativeAcks[batchMsgID]
+ if present {
+ // The batch is already being tracked
+ return
+ }
+
+ targetTime := time.Now().Add(time.Duration(nackBackoffDelay))
+ t.negativeAcks[batchMsgID] = targetTime
+}
+
func (t *negativeAcksTracker) track() {
for {
select {
@@ -105,7 +142,6 @@ func (t *negativeAcksTracker) track() {
t.rc.Redeliver(msgIds)
}
}
-
}
}
}
diff --git a/pulsar/negative_acks_tracker_test.go
b/pulsar/negative_acks_tracker_test.go
index e587f3f..51965ea 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -75,7 +75,7 @@ func (nmc *nackMockedConsumer) Wait() <-chan messageID {
func TestNacksTracker(t *testing.T) {
nmc := newNackMockedConsumer()
- nacks := newNegativeAcksTracker(nmc, testNackDelay,
log.DefaultNopLogger())
+ nacks := newNegativeAcksTracker(nmc, testNackDelay, nil,
log.DefaultNopLogger())
nacks.Add(messageID{
ledgerID: 1,
@@ -108,7 +108,7 @@ func TestNacksTracker(t *testing.T) {
func TestNacksWithBatchesTracker(t *testing.T) {
nmc := newNackMockedConsumer()
- nacks := newNegativeAcksTracker(nmc, testNackDelay,
log.DefaultNopLogger())
+ nacks := newNegativeAcksTracker(nmc, testNackDelay, nil,
log.DefaultNopLogger())
nacks.Add(messageID{
ledgerID: 1,
@@ -148,3 +148,155 @@ func TestNacksWithBatchesTracker(t *testing.T) {
nacks.Close()
}
+
+func TestNackBackoffTracker(t *testing.T) {
+ nmc := newNackMockedConsumer()
+ nacks := newNegativeAcksTracker(nmc, testNackDelay,
new(defaultNackBackoffPolicy), log.DefaultNopLogger())
+
+ nacks.AddMessage(new(mockMessage1))
+ nacks.AddMessage(new(mockMessage2))
+
+ msgIds := make([]messageID, 0)
+ for id := range nmc.Wait() {
+ msgIds = append(msgIds, id)
+ }
+ msgIds = sortMessageIds(msgIds)
+
+ assert.Equal(t, 2, len(msgIds))
+ assert.Equal(t, int64(1), msgIds[0].ledgerID)
+ assert.Equal(t, int64(1), msgIds[0].entryID)
+ assert.Equal(t, int64(2), msgIds[1].ledgerID)
+ assert.Equal(t, int64(2), msgIds[1].entryID)
+
+ nacks.Close()
+ // allow multiple Close without panicing
+ nacks.Close()
+}
+
+type mockMessage1 struct {
+ properties map[string]string
+}
+
+func (msg *mockMessage1) Topic() string {
+ return ""
+}
+
+func (msg *mockMessage1) Properties() map[string]string {
+ return msg.properties
+}
+
+func (msg *mockMessage1) Payload() []byte {
+ return nil
+}
+
+func (msg *mockMessage1) ID() MessageID {
+ return messageID{
+ ledgerID: 1,
+ entryID: 1,
+ batchIdx: 1,
+ }
+}
+
+func (msg *mockMessage1) PublishTime() time.Time {
+ return time.Time{}
+}
+
+func (msg *mockMessage1) EventTime() time.Time {
+ return time.Time{}
+}
+
+func (msg *mockMessage1) Key() string {
+ return ""
+}
+
+func (msg *mockMessage1) OrderingKey() string {
+ return ""
+}
+
+func (msg *mockMessage1) RedeliveryCount() uint32 {
+ return 0
+}
+
+func (msg *mockMessage1) IsReplicated() bool {
+ return false
+}
+
+func (msg *mockMessage1) GetReplicatedFrom() string {
+ return ""
+}
+
+func (msg *mockMessage1) GetSchemaValue(v interface{}) error {
+ return nil
+}
+
+func (msg *mockMessage1) ProducerName() string {
+ return ""
+}
+
+func (msg *mockMessage1) GetEncryptionContext() *EncryptionContext {
+ return &EncryptionContext{}
+}
+
+type mockMessage2 struct {
+ properties map[string]string
+}
+
+func (msg *mockMessage2) Topic() string {
+ return ""
+}
+
+func (msg *mockMessage2) Properties() map[string]string {
+ return msg.properties
+}
+
+func (msg *mockMessage2) Payload() []byte {
+ return nil
+}
+
+func (msg *mockMessage2) ID() MessageID {
+ return messageID{
+ ledgerID: 2,
+ entryID: 2,
+ batchIdx: 1,
+ }
+}
+
+func (msg *mockMessage2) PublishTime() time.Time {
+ return time.Time{}
+}
+
+func (msg *mockMessage2) EventTime() time.Time {
+ return time.Time{}
+}
+
+func (msg *mockMessage2) Key() string {
+ return ""
+}
+
+func (msg *mockMessage2) OrderingKey() string {
+ return ""
+}
+
+func (msg *mockMessage2) RedeliveryCount() uint32 {
+ return 0
+}
+
+func (msg *mockMessage2) IsReplicated() bool {
+ return false
+}
+
+func (msg *mockMessage2) GetReplicatedFrom() string {
+ return ""
+}
+
+func (msg *mockMessage2) GetSchemaValue(v interface{}) error {
+ return nil
+}
+
+func (msg *mockMessage2) ProducerName() string {
+ return ""
+}
+
+func (msg *mockMessage2) GetEncryptionContext() *EncryptionContext {
+ return &EncryptionContext{}
+}
diff --git a/pulsar/negative_backoff_policy.go
b/pulsar/negative_backoff_policy.go
new file mode 100644
index 0000000..cf080ad
--- /dev/null
+++ b/pulsar/negative_backoff_policy.go
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "math"
+
+// NackBackoffPolicy is a interface for custom message negativeAcked policy,
users can specify a NackBackoffPolicy
+// for a consumer.
+//
+// > Notice: the consumer crashes will trigger the redelivery of the unacked
message, this case will not respect the
+// > NackBackoffPolicy, which means the message might get redelivered earlier
than the delay time
+// > from the backoff.
+type NackBackoffPolicy interface {
+ // The redeliveryCount indicates the number of times the message was
redelivered.
+ // We can get the redeliveryCount from the CommandMessage.
+ Next(redeliveryCount uint32) int64
+}
+
+// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
+type defaultNackBackoffPolicy struct{}
+
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
+ minNackTimeMs := int64(1000 * 30) // 30sec
+ maxNackTimeMs := 1000 * 60 * 10 // 10min
+
+ if redeliveryCount < 0 {
+ return minNackTimeMs
+ }
+
+ return
int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)),
float64(maxNackTimeMs)))
+}
diff --git a/pulsar/negative_backoff_policy_test.go
b/pulsar/negative_backoff_policy_test.go
new file mode 100644
index 0000000..bfbb6a8
--- /dev/null
+++ b/pulsar/negative_backoff_policy_test.go
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestDefaultNackBackoffPolicy_Next(t *testing.T) {
+ defaultNackBackoff := new(defaultNackBackoffPolicy)
+
+ res0 := defaultNackBackoff.Next(0)
+ assert.Equal(t, int64(1000*30), res0)
+
+ res5 := defaultNackBackoff.Next(5)
+ assert.Equal(t, int64(600000), res5)
+}