This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 630d5f82 [fix] Reconnection logic and Backoff policy doesn't work
correctly (#1197)
630d5f82 is described below
commit 630d5f8218e00fe248d745c63ad0905527f45e66
Author: crossoverJie <[email protected]>
AuthorDate: Mon Sep 23 18:16:35 2024 +0800
[fix] Reconnection logic and Backoff policy doesn't work correctly (#1197)
Fixes #1187
### Modifications
- Move `backoff.go` to the `backoff` directory (because there are circular
dependencies, they are not moved to the pulsar directory.)
- Create a new method for `BackOffPolicy` interface
`IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime time.Duration)
bool`
This is a **breaking change** that modifies the package name and interface
name.
Package: `internal`->`backoff`
Interface name: `BackoffPolicy`-> `Policy`
---------
Co-authored-by: Zixuan Liu <[email protected]>
Co-authored-by: Zike Yang <[email protected]>
---
pulsar/{internal => backoff}/backoff.go | 24 ++++++++++++--
pulsar/{internal => backoff}/backoff_test.go | 4 ++-
pulsar/blue_green_migration_test.go | 15 +++++----
pulsar/consumer.go | 6 ++--
pulsar/consumer_impl.go | 7 +++--
pulsar/consumer_partition.go | 41 +++++++++++++-----------
pulsar/consumer_regex_test.go | 10 +++---
pulsar/consumer_test.go | 16 ++++++----
pulsar/dlq_router.go | 47 +++++++++++++++++-----------
pulsar/internal/http_client.go | 6 ++--
pulsar/internal/rpc_client.go | 6 ++--
pulsar/producer.go | 6 ++--
pulsar/producer_partition.go | 47 +++++++++++++++++-----------
pulsar/producer_test.go | 20 +++++++-----
pulsar/reader.go | 6 ++--
pulsar/reader_impl.go | 7 +++--
pulsar/reader_test.go | 23 ++++++++++----
pulsar/retry_router.go | 36 +++++++++++++--------
pulsar/transaction_coordinator_client.go | 4 ++-
19 files changed, 207 insertions(+), 124 deletions(-)
diff --git a/pulsar/internal/backoff.go b/pulsar/backoff/backoff.go
similarity index 77%
rename from pulsar/internal/backoff.go
rename to pulsar/backoff/backoff.go
index 3284fb7e..453da578 100644
--- a/pulsar/internal/backoff.go
+++ b/pulsar/backoff/backoff.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package internal
+package backoff
import (
"math/rand"
@@ -26,10 +26,17 @@ func init() {
rand.Seed(time.Now().UnixNano())
}
-// BackoffPolicy parameterize the following options in the reconnection logic
to
+// Policy parameterize the following options in the reconnection logic to
// allow users to customize the reconnection logic (minBackoff, maxBackoff and
jitterPercentage)
-type BackoffPolicy interface {
+type Policy interface {
+ // Next returns the delay to wait before next retry
Next() time.Duration
+
+ // IsMaxBackoffReached evaluates if the max number of retries is reached
+ IsMaxBackoffReached() bool
+
+ // Reset the backoff to the initial state
+ Reset()
}
// DefaultBackoff computes the delay before retrying an action.
@@ -38,6 +45,13 @@ type DefaultBackoff struct {
backoff time.Duration
}
+func NewDefaultBackoff() Policy {
+ return &DefaultBackoff{}
+}
+func NewDefaultBackoffWithInitialBackOff(backoff time.Duration) Policy {
+ return &DefaultBackoff{backoff: backoff / 2}
+}
+
const maxBackoff = 60 * time.Second
// Next returns the delay to wait before next retry
@@ -61,3 +75,7 @@ func (b *DefaultBackoff) Next() time.Duration {
func (b *DefaultBackoff) IsMaxBackoffReached() bool {
return b.backoff >= maxBackoff
}
+
+func (b *DefaultBackoff) Reset() {
+ b.backoff = 0
+}
diff --git a/pulsar/internal/backoff_test.go b/pulsar/backoff/backoff_test.go
similarity index 96%
rename from pulsar/internal/backoff_test.go
rename to pulsar/backoff/backoff_test.go
index e05ea292..fc0a4923 100644
--- a/pulsar/internal/backoff_test.go
+++ b/pulsar/backoff/backoff_test.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package internal
+package backoff
import (
"testing"
@@ -58,4 +58,6 @@ func TestBackoff_NextMaxValue(t *testing.T) {
assert.Equal(t, true, backoff.IsMaxBackoffReached())
// max value is 60 seconds + 20% jitter = 72 seconds
assert.LessOrEqual(t, int64(cappedDelay), int64(72*time.Second))
+ backoff.Reset()
+ assert.Equal(t, false, backoff.IsMaxBackoffReached())
}
diff --git a/pulsar/blue_green_migration_test.go
b/pulsar/blue_green_migration_test.go
index 91667e8d..672ef343 100644
--- a/pulsar/blue_green_migration_test.go
+++ b/pulsar/blue_green_migration_test.go
@@ -54,8 +54,8 @@ func (suite *BlueGreenMigrationTestSuite)
TestTopicMigration() {
for _, scenario := range []topicUnloadTestCase{
{
- testCaseName: "proxyConnection",
- blueAdminURL: "http://localhost:8080",
+ testCaseName: "proxyConnection",
+ blueAdminURL: "http://localhost:8080",
blueClientUrl: "pulsar://localhost:6650",
greenAdminURL: "http://localhost:8081",
migrationBody: `
@@ -83,17 +83,17 @@ func testTopicMigrate(
migrationBody string) {
runtime.GOMAXPROCS(1)
const (
- cluster = "cluster-a"
+ cluster = "cluster-a"
tenant = utils.PUBLICTENANT
namespace = utils.DEFAULTNAMESPACE
- blueBroker1URL = "pulsar://broker-1:6650"
- blueBroker2URL = "pulsar://broker-2:6650"
+ blueBroker1URL = "pulsar://broker-1:6650"
+ blueBroker2URL = "pulsar://broker-2:6650"
greenBroker1URL = "pulsar://green-broker-1:6650"
greenBroker2URL = "pulsar://green-broker-2:6650"
- blueBroker1LookupURL = "broker-1:8080"
- blueBroker2LookupURL = "broker-2:8080"
+ blueBroker1LookupURL = "broker-1:8080"
+ blueBroker2LookupURL = "broker-2:8080"
greenBroker1LookupURL = "green-broker-1:8080"
greenBroker2LookupURL = "green-broker-2:8080"
)
@@ -234,7 +234,6 @@ func testTopicMigrate(
req.NoError(err)
req.NotEmpty(bundleRange)
-
unloadURL := fmt.Sprintf(
"/admin/v2/namespaces/%s/%s/%s/unload?destinationBroker=%s",
tenant, namespace, bundleRange, dstTopicBrokerLookupURL)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index bf2eafbf..880cad56 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -21,7 +21,7 @@ import (
"context"
"time"
- "github.com/apache/pulsar-client-go/pulsar/internal"
+ "github.com/apache/pulsar-client-go/pulsar/backoff"
)
// ConsumerMessage represents a pair of a Consumer and Message.
@@ -207,9 +207,9 @@ type ConsumerOptions struct {
// MaxReconnectToBroker sets the maximum retry number of
reconnectToBroker. (default: ultimate)
MaxReconnectToBroker *uint
- // BackoffPolicy parameterize the following options in the reconnection
logic to
+ // BackOffPolicyFunc parameterize the following options in the
reconnection logic to
// allow users to customize the reconnection logic (minBackoff,
maxBackoff and jitterPercentage)
- BackoffPolicy internal.BackoffPolicy
+ BackOffPolicyFunc func() backoff.Policy
// Decryption represents the encryption related fields required by the
consumer to decrypt a message.
Decryption *MessageDecryptionInfo
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 679054a0..a3d3e3ff 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -174,11 +174,12 @@ func newConsumer(client *client, options ConsumerOptions)
(Consumer, error) {
}
}
- dlq, err := newDlqRouter(client, options.DLQ, options.Topic,
options.SubscriptionName, options.Name, client.log)
+ dlq, err := newDlqRouter(client, options.DLQ, options.Topic,
options.SubscriptionName, options.Name,
+ options.BackOffPolicyFunc, client.log)
if err != nil {
return nil, err
}
- rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable,
client.log)
+ rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable,
options.BackOffPolicyFunc, client.log)
if err != nil {
return nil, err
}
@@ -453,7 +454,7 @@ func newPartitionConsumerOpts(topic, consumerName string,
idx int, options Consu
readCompacted: options.ReadCompacted,
interceptors: options.Interceptors,
maxReconnectToBroker: options.MaxReconnectToBroker,
- backoffPolicy: options.BackoffPolicy,
+ backOffPolicyFunc: options.BackOffPolicyFunc,
keySharedPolicy: options.KeySharedPolicy,
schema: options.Schema,
decryption: options.Decryption,
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index f307972c..16aef5d4 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -26,6 +26,8 @@ import (
"sync"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/backoff"
+
"google.golang.org/protobuf/proto"
"github.com/apache/pulsar-client-go/pulsar/crypto"
@@ -110,7 +112,7 @@ type partitionConsumerOpts struct {
disableForceTopicCreation bool
interceptors ConsumerInterceptors
maxReconnectToBroker *uint
- backoffPolicy internal.BackoffPolicy
+ backOffPolicyFunc func() backoff.Policy
keySharedPolicy *KeySharedPolicy
schema Schema
decryption *MessageDecryptionInfo
@@ -182,6 +184,7 @@ type partitionConsumer struct {
lastMessageInBroker *trackingMessageID
redirectedClusterURI string
+ backoffPolicyFunc func() backoff.Policy
}
func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
@@ -318,6 +321,13 @@ func (s *schemaInfoCache) add(schemaVersionHash string,
schema Schema) {
func newPartitionConsumer(parent Consumer, client *client, options
*partitionConsumerOpts,
messageCh chan ConsumerMessage, dlq *dlqRouter,
metrics *internal.LeveledMetrics) (*partitionConsumer, error) {
+ var boFunc func() backoff.Policy
+ if options.backOffPolicyFunc != nil {
+ boFunc = options.backOffPolicyFunc
+ } else {
+ boFunc = backoff.NewDefaultBackoff
+ }
+
pc := &partitionConsumer{
parentConsumer: parent,
client: client,
@@ -339,6 +349,7 @@ func newPartitionConsumer(parent Consumer, client *client,
options *partitionCon
dlq: dlq,
metrics: metrics,
schemaInfoCache: newSchemaInfoCache(client, options.topic),
+ backoffPolicyFunc: boFunc,
}
if pc.options.autoReceiverQueueSize {
pc.currentQueueSize.Store(initialReceiverQueueSize)
@@ -581,12 +592,7 @@ func (pc *partitionConsumer) getLastMessageID()
(*trackingMessageID, error) {
return nil, errors.New("failed to getLastMessageID for the
closing or closed consumer")
}
remainTime := pc.client.operationTimeout
- var backoff internal.BackoffPolicy
- if pc.options.backoffPolicy != nil {
- backoff = pc.options.backoffPolicy
- } else {
- backoff = &internal.DefaultBackoff{}
- }
+ bo := pc.backoffPolicyFunc()
request := func() (*trackingMessageID, error) {
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req
@@ -604,7 +610,7 @@ func (pc *partitionConsumer) getLastMessageID()
(*trackingMessageID, error) {
pc.log.WithError(err).Error("Failed to
getLastMessageID")
return nil, fmt.Errorf("failed to getLastMessageID due
to %w", err)
}
- nextDelay := backoff.Next()
+ nextDelay := bo.Next()
if nextDelay > remainTime {
nextDelay = remainTime
}
@@ -1684,18 +1690,17 @@ func (pc *partitionConsumer) internalClose(req
*closeRequest) {
}
func (pc *partitionConsumer) reconnectToBroker(connectionClosed
*connectionClosed) {
- var maxRetry int
+ var (
+ maxRetry int
+ delayReconnectTime, totalDelayReconnectTime time.Duration
+ )
if pc.options.maxReconnectToBroker == nil {
maxRetry = -1
} else {
maxRetry = int(*pc.options.maxReconnectToBroker)
}
-
- var (
- delayReconnectTime time.Duration
- defaultBackoff = internal.DefaultBackoff{}
- )
+ bo := pc.backoffPolicyFunc()
for maxRetry != 0 {
if pc.getConsumerState() != consumerReady {
@@ -1710,11 +1715,10 @@ func (pc *partitionConsumer)
reconnectToBroker(connectionClosed *connectionClose
delayReconnectTime = 0
assignedBrokerURL = connectionClosed.assignedBrokerURL
connectionClosed = nil // Attempt connecting to the
assigned broker just once
- } else if pc.options.backoffPolicy == nil {
- delayReconnectTime = defaultBackoff.Next()
} else {
- delayReconnectTime = pc.options.backoffPolicy.Next()
+ delayReconnectTime = bo.Next()
}
+ totalDelayReconnectTime += delayReconnectTime
pc.log.WithFields(log.Fields{
"assignedBrokerURL": assignedBrokerURL,
@@ -1733,6 +1737,7 @@ func (pc *partitionConsumer)
reconnectToBroker(connectionClosed *connectionClose
if err == nil {
// Successfully reconnected
pc.log.Info("Reconnected consumer to broker")
+ bo.Reset()
return
}
pc.log.WithError(err).Error("Failed to create consumer at
reconnect")
@@ -1747,7 +1752,7 @@ func (pc *partitionConsumer)
reconnectToBroker(connectionClosed *connectionClose
maxRetry--
}
pc.metrics.ConsumersReconnectFailure.Inc()
- if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
+ if maxRetry == 0 || bo.IsMaxBackoffReached() {
pc.metrics.ConsumersReconnectMaxRetry.Inc()
}
}
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index 28ba4f72..e1e2ca29 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -159,8 +159,9 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c
Client, namespace string
Name: "regex-consumer",
}
- dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub",
"regex-consumer", log.DefaultNopLogger())
- rlq, _ := newRetryRouter(c.(*client), nil, false,
log.DefaultNopLogger())
+ dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub",
"regex-consumer",
+ nil, log.DefaultNopLogger())
+ rlq, _ := newRetryRouter(c.(*client), nil, false, nil,
log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern,
make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
t.Fatal(err)
@@ -198,8 +199,9 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c
Client, namespace string
Name: "regex-consumer",
}
- dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub",
"regex-consumer", log.DefaultNopLogger())
- rlq, _ := newRetryRouter(c.(*client), nil, false,
log.DefaultNopLogger())
+ dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub",
"regex-consumer",
+ nil, log.DefaultNopLogger())
+ rlq, _ := newRetryRouter(c.(*client), nil, false, nil,
log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern,
make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
t.Fatal(err)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 04439cbc..a4a8d995 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -30,6 +30,8 @@ import (
"testing"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/backoff"
+
"github.com/apache/pulsar-client-go/pulsaradmin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
@@ -3874,12 +3876,14 @@ func TestConsumerWithBackoffPolicy(t *testing.T) {
topicName := newTopicName()
- backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second)
+ bo := newTestBackoffPolicy(1*time.Second, 4*time.Second)
_consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
Type: Shared,
- BackoffPolicy: backoff,
+ BackOffPolicyFunc: func() backoff.Policy {
+ return bo
+ },
})
assert.Nil(t, err)
defer _consumer.Close()
@@ -3888,22 +3892,22 @@ func TestConsumerWithBackoffPolicy(t *testing.T) {
// 1 s
startTime := time.Now()
partitionConsumerImp.reconnectToBroker(nil)
- assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+ assert.True(t, bo.IsExpectedIntervalFrom(startTime))
// 2 s
startTime = time.Now()
partitionConsumerImp.reconnectToBroker(nil)
- assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+ assert.True(t, bo.IsExpectedIntervalFrom(startTime))
// 4 s
startTime = time.Now()
partitionConsumerImp.reconnectToBroker(nil)
- assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+ assert.True(t, bo.IsExpectedIntervalFrom(startTime))
// 4 s
startTime = time.Now()
partitionConsumerImp.reconnectToBroker(nil)
- assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+ assert.True(t, bo.IsExpectedIntervalFrom(startTime))
}
func TestAckWithMessageID(t *testing.T) {
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 647c022d..6b13b329 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -22,31 +22,40 @@ import (
"fmt"
"time"
- "github.com/apache/pulsar-client-go/pulsar/internal"
+ "github.com/apache/pulsar-client-go/pulsar/backoff"
+
"github.com/apache/pulsar-client-go/pulsar/log"
)
type dlqRouter struct {
- client Client
- producer Producer
- policy *DLQPolicy
- messageCh chan ConsumerMessage
- closeCh chan interface{}
- topicName string
- subscriptionName string
- consumerName string
- log log.Logger
+ client Client
+ producer Producer
+ policy *DLQPolicy
+ messageCh chan ConsumerMessage
+ closeCh chan interface{}
+ topicName string
+ subscriptionName string
+ consumerName string
+ backOffPolicyFunc func() backoff.Policy
+ log log.Logger
}
func newDlqRouter(client Client, policy *DLQPolicy, topicName,
subscriptionName, consumerName string,
- logger log.Logger) (*dlqRouter, error) {
+ backOffPolicyFunc func() backoff.Policy, logger log.Logger)
(*dlqRouter, error) {
+ var boFunc func() backoff.Policy
+ if backOffPolicyFunc != nil {
+ boFunc = backOffPolicyFunc
+ } else {
+ boFunc = backoff.NewDefaultBackoff
+ }
r := &dlqRouter{
- client: client,
- policy: policy,
- topicName: topicName,
- subscriptionName: subscriptionName,
- consumerName: consumerName,
- log: logger,
+ client: client,
+ policy: policy,
+ topicName: topicName,
+ subscriptionName: subscriptionName,
+ consumerName: consumerName,
+ backOffPolicyFunc: boFunc,
+ log: logger,
}
if policy != nil {
@@ -155,7 +164,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
}
// Retry to create producer indefinitely
- backoff := &internal.DefaultBackoff{}
+ bo := r.backOffPolicyFunc()
for {
opt := r.policy.ProducerOptions
opt.Topic = r.policy.DeadLetterTopic
@@ -174,7 +183,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
if err != nil {
r.log.WithError(err).Error("Failed to create DLQ
producer")
- time.Sleep(backoff.Next())
+ time.Sleep(bo.Next())
continue
} else {
r.producer = producer
diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go
index e68bd17c..eea0101a 100644
--- a/pulsar/internal/http_client.go
+++ b/pulsar/internal/http_client.go
@@ -29,6 +29,8 @@ import (
"path"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/backoff"
+
"github.com/apache/pulsar-client-go/pulsar/auth"
"github.com/apache/pulsar-client-go/pulsar/log"
@@ -148,12 +150,12 @@ func (c *httpClient) Get(endpoint string, obj
interface{}, params map[string]str
if _, ok := err.(*url.Error); ok {
// We can retry this kind of requests over a connection error
because they're
// not specific to a particular broker.
- backoff := DefaultBackoff{100 * time.Millisecond}
+ bo := backoff.NewDefaultBackoffWithInitialBackOff(100 *
time.Millisecond)
startTime := time.Now()
var retryTime time.Duration
for time.Since(startTime) < c.requestTimeout {
- retryTime = backoff.Next()
+ retryTime = bo.Next()
c.log.Debugf("Retrying httpRequest in {%v} with timeout
in {%v}", retryTime, c.requestTimeout)
time.Sleep(retryTime)
_, err = c.GetWithQueryParams(endpoint, obj, params,
true)
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index d2e3895e..0f993116 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -25,6 +25,8 @@ import (
"sync/atomic"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/backoff"
+
"github.com/apache/pulsar-client-go/pulsar/auth"
"github.com/apache/pulsar-client-go/pulsar/log"
@@ -115,7 +117,7 @@ func (c *rpcClient) requestToHost(serviceNameResolver
*ServiceNameResolver,
var host *url.URL
var rpcResult *RPCResult
startTime := time.Now()
- backoff := DefaultBackoff{100 * time.Millisecond}
+ bo := backoff.NewDefaultBackoffWithInitialBackOff(100 *
time.Millisecond)
// we can retry these requests because this kind of request is
// not specific to any particular broker
for time.Since(startTime) < c.requestTimeout {
@@ -130,7 +132,7 @@ func (c *rpcClient) requestToHost(serviceNameResolver
*ServiceNameResolver,
break
}
- retryTime := backoff.Next()
+ retryTime := bo.Next()
c.log.Debugf("Retrying request in {%v} with timeout in {%v}",
retryTime, c.requestTimeout)
time.Sleep(retryTime)
}
diff --git a/pulsar/producer.go b/pulsar/producer.go
index 0ae51bd4..997f9c0d 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -21,7 +21,7 @@ import (
"context"
"time"
- "github.com/apache/pulsar-client-go/pulsar/internal"
+ "github.com/apache/pulsar-client-go/pulsar/backoff"
)
type HashingScheme int
@@ -171,9 +171,9 @@ type ProducerOptions struct {
// MaxReconnectToBroker specifies the maximum retry number of
reconnectToBroker. (default: ultimate)
MaxReconnectToBroker *uint
- // BackoffPolicy parameterize the following options in the reconnection
logic to
+ // BackOffPolicyFunc parameterize the following options in the
reconnection logic to
// allow users to customize the reconnection logic (minBackoff,
maxBackoff and jitterPercentage)
- BackoffPolicy internal.BackoffPolicy
+ BackOffPolicyFunc func() backoff.Policy
// BatcherBuilderType sets the batch builder type (default
DefaultBatchBuilder)
// This will be used to create batch container when batching is enabled.
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index af5fb38a..f578ee4b 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -28,6 +28,8 @@ import (
"sync/atomic"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/backoff"
+
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
internalcrypto
"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
@@ -121,6 +123,7 @@ type partitionProducer struct {
redirectedClusterURI string
ctx context.Context
cancelFunc context.CancelFunc
+ backOffPolicyFunc func() backoff.Policy
}
type schemaCache struct {
@@ -147,6 +150,14 @@ func (s *schemaCache) Get(schema *SchemaInfo)
(schemaVersion []byte) {
func newPartitionProducer(client *client, topic string, options
*ProducerOptions, partitionIdx int,
metrics *internal.LeveledMetrics) (
*partitionProducer, error) {
+
+ var boFunc func() backoff.Policy
+ if options.BackOffPolicyFunc != nil {
+ boFunc = options.BackOffPolicyFunc
+ } else {
+ boFunc = backoff.NewDefaultBackoff
+ }
+
var batchingMaxPublishDelay time.Duration
if options.BatchingMaxPublishDelay != 0 {
batchingMaxPublishDelay = options.BatchingMaxPublishDelay
@@ -176,15 +187,16 @@ func newPartitionProducer(client *client, topic string,
options *ProducerOptions
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
compressionProvider:
internal.GetCompressionProvider(pb.CompressionType(options.CompressionType),
compression.Level(options.CompressionLevel)),
- publishSemaphore:
internal.NewSemaphore(int32(maxPendingMessages)),
- pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
- lastSequenceID: -1,
- partitionIdx: int32(partitionIdx),
- metrics: metrics,
- epoch: 0,
- schemaCache: newSchemaCache(),
- ctx: ctx,
- cancelFunc: cancelFunc,
+ publishSemaphore:
internal.NewSemaphore(int32(maxPendingMessages)),
+ pendingQueue:
internal.NewBlockingQueue(maxPendingMessages),
+ lastSequenceID: -1,
+ partitionIdx: int32(partitionIdx),
+ metrics: metrics,
+ epoch: 0,
+ schemaCache: newSchemaCache(),
+ ctx: ctx,
+ cancelFunc: cancelFunc,
+ backOffPolicyFunc: boFunc,
}
if p.options.DisableBatching {
p.batchFlushTicker.Stop()
@@ -458,17 +470,17 @@ func (p *partitionProducer) getOrCreateSchema(schemaInfo
*SchemaInfo) (schemaVer
}
func (p *partitionProducer) reconnectToBroker(connectionClosed
*connectionClosed) {
- var maxRetry int
+ var (
+ maxRetry int
+ delayReconnectTime time.Duration
+ )
if p.options.MaxReconnectToBroker == nil {
maxRetry = -1
} else {
maxRetry = int(*p.options.MaxReconnectToBroker)
}
- var (
- delayReconnectTime time.Duration
- defaultBackoff = internal.DefaultBackoff{}
- )
+ bo := p.backOffPolicyFunc()
for maxRetry != 0 {
select {
@@ -489,10 +501,8 @@ func (p *partitionProducer)
reconnectToBroker(connectionClosed *connectionClosed
delayReconnectTime = 0
assignedBrokerURL = connectionClosed.assignedBrokerURL
connectionClosed = nil // Only attempt once
- } else if p.options.BackoffPolicy == nil {
- delayReconnectTime = defaultBackoff.Next()
} else {
- delayReconnectTime = p.options.BackoffPolicy.Next()
+ delayReconnectTime = bo.Next()
}
p.log.WithFields(log.Fields{
@@ -513,6 +523,7 @@ func (p *partitionProducer)
reconnectToBroker(connectionClosed *connectionClosed
if err == nil {
// Successfully reconnected
p.log.WithField("cnx",
p._getConn().ID()).Info("Reconnected producer to broker")
+ bo.Reset()
return
}
p.log.WithError(err).Error("Failed to create producer at
reconnect")
@@ -546,7 +557,7 @@ func (p *partitionProducer)
reconnectToBroker(connectionClosed *connectionClosed
maxRetry--
}
p.metrics.ProducersReconnectFailure.Inc()
- if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
+ if maxRetry == 0 || bo.IsMaxBackoffReached() {
p.metrics.ProducersReconnectMaxRetry.Inc()
}
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 5b23182d..afd1f09a 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -34,6 +34,8 @@ import (
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
+ "github.com/apache/pulsar-client-go/pulsar/backoff"
+
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
@@ -1293,11 +1295,13 @@ func TestProducerWithBackoffPolicy(t *testing.T) {
topicName := newTopicName()
- backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second)
+ bo := newTestBackoffPolicy(1*time.Second, 4*time.Second)
_producer, err := client.CreateProducer(ProducerOptions{
- Topic: topicName,
- SendTimeout: 2 * time.Second,
- BackoffPolicy: backoff,
+ Topic: topicName,
+ SendTimeout: 2 * time.Second,
+ BackOffPolicyFunc: func() backoff.Policy {
+ return bo
+ },
})
assert.Nil(t, err)
defer _producer.Close()
@@ -1306,22 +1310,22 @@ func TestProducerWithBackoffPolicy(t *testing.T) {
// 1 s
startTime := time.Now()
partitionProducerImp.reconnectToBroker(nil)
- assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+ assert.True(t, bo.IsExpectedIntervalFrom(startTime))
// 2 s
startTime = time.Now()
partitionProducerImp.reconnectToBroker(nil)
- assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+ assert.True(t, bo.IsExpectedIntervalFrom(startTime))
// 4 s
startTime = time.Now()
partitionProducerImp.reconnectToBroker(nil)
- assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+ assert.True(t, bo.IsExpectedIntervalFrom(startTime))
// 4 s
startTime = time.Now()
partitionProducerImp.reconnectToBroker(nil)
- assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+ assert.True(t, bo.IsExpectedIntervalFrom(startTime))
}
func TestSendContextExpired(t *testing.T) {
diff --git a/pulsar/reader.go b/pulsar/reader.go
index 4daa8890..98bde4e3 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -21,7 +21,7 @@ import (
"context"
"time"
- "github.com/apache/pulsar-client-go/pulsar/internal"
+ "github.com/apache/pulsar-client-go/pulsar/backoff"
)
// ReaderMessage packages Reader and Message as a struct to use.
@@ -89,9 +89,9 @@ type ReaderOptions struct {
// Schema represents the schema implementation.
Schema Schema
- // BackoffPolicy parameterize the following options in the reconnection
logic to
+ // BackoffPolicyFunc parameterize the following options in the
reconnection logic to
// allow users to customize the reconnection logic (minBackoff,
maxBackoff and jitterPercentage)
- BackoffPolicy internal.BackoffPolicy
+ BackoffPolicyFunc func() backoff.Policy
// MaxPendingChunkedMessage sets the maximum pending chunked messages.
(default: 100)
MaxPendingChunkedMessage int
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 7cbae05c..f76255e2 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -112,7 +112,7 @@ func newReader(client *client, options ReaderOptions)
(Reader, error) {
ReplicateSubscriptionState: false,
Decryption: options.Decryption,
Schema: options.Schema,
- BackoffPolicy: options.BackoffPolicy,
+ BackOffPolicyFunc: options.BackoffPolicyFunc,
MaxPendingChunkedMessage: options.MaxPendingChunkedMessage,
ExpireTimeOfIncompleteChunk:
options.ExpireTimeOfIncompleteChunk,
AutoAckIncompleteChunk: options.AutoAckIncompleteChunk,
@@ -128,12 +128,13 @@ func newReader(client *client, options ReaderOptions)
(Reader, error) {
}
// Provide dummy dlq router with not dlq policy
- dlq, err := newDlqRouter(client, nil, options.Topic,
options.SubscriptionName, options.Name, client.log)
+ dlq, err := newDlqRouter(client, nil, options.Topic,
options.SubscriptionName, options.Name,
+ options.BackoffPolicyFunc, client.log)
if err != nil {
return nil, err
}
// Provide dummy rlq router with not dlq policy
- rlq, err := newRetryRouter(client, nil, false, client.log)
+ rlq, err := newRetryRouter(client, nil, false,
options.BackoffPolicyFunc, client.log)
if err != nil {
return nil, err
}
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index d00346fc..3c928c1d 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -23,6 +23,8 @@ import (
"testing"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/backoff"
+
"github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsaradmin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
@@ -847,6 +849,13 @@ func (b *testBackoffPolicy) Next() time.Duration {
return b.curBackoff
}
+func (b *testBackoffPolicy) IsMaxBackoffReached() bool {
+ return false
+}
+
+func (b *testBackoffPolicy) Reset() {
+
+}
func (b *testBackoffPolicy) IsExpectedIntervalFrom(startTime time.Time) bool {
// Approximately equal to expected interval
@@ -866,11 +875,13 @@ func TestReaderWithBackoffPolicy(t *testing.T) {
assert.Nil(t, err)
defer client.Close()
- backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second)
+ bo := newTestBackoffPolicy(1*time.Second, 4*time.Second)
_reader, err := client.CreateReader(ReaderOptions{
Topic: "my-topic",
StartMessageID: LatestMessageID(),
- BackoffPolicy: backoff,
+ BackoffPolicyFunc: func() backoff.Policy {
+ return bo
+ },
})
assert.NotNil(t, _reader)
assert.Nil(t, err)
@@ -879,22 +890,22 @@ func TestReaderWithBackoffPolicy(t *testing.T) {
// 1 s
startTime := time.Now()
partitionConsumerImp.reconnectToBroker(nil)
- assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+ assert.True(t, bo.IsExpectedIntervalFrom(startTime))
// 2 s
startTime = time.Now()
partitionConsumerImp.reconnectToBroker(nil)
- assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+ assert.True(t, bo.IsExpectedIntervalFrom(startTime))
// 4 s
startTime = time.Now()
partitionConsumerImp.reconnectToBroker(nil)
- assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+ assert.True(t, bo.IsExpectedIntervalFrom(startTime))
// 4 s
startTime = time.Now()
partitionConsumerImp.reconnectToBroker(nil)
- assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+ assert.True(t, bo.IsExpectedIntervalFrom(startTime))
}
func TestReaderGetLastMessageID(t *testing.T) {
diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go
index 75792adc..c8aa0b94 100644
--- a/pulsar/retry_router.go
+++ b/pulsar/retry_router.go
@@ -21,7 +21,8 @@ import (
"context"
"time"
- "github.com/apache/pulsar-client-go/pulsar/internal"
+ "github.com/apache/pulsar-client-go/pulsar/backoff"
+
"github.com/apache/pulsar-client-go/pulsar/log"
)
@@ -44,19 +45,28 @@ type RetryMessage struct {
}
type retryRouter struct {
- client Client
- producer Producer
- policy *DLQPolicy
- messageCh chan RetryMessage
- closeCh chan interface{}
- log log.Logger
+ client Client
+ producer Producer
+ policy *DLQPolicy
+ messageCh chan RetryMessage
+ closeCh chan interface{}
+ backOffPolicyFunc func() backoff.Policy
+ log log.Logger
}
-func newRetryRouter(client Client, policy *DLQPolicy, retryEnabled bool,
logger log.Logger) (*retryRouter, error) {
+func newRetryRouter(client Client, policy *DLQPolicy, retryEnabled bool,
backOffPolicyFunc func() backoff.Policy,
+ logger log.Logger) (*retryRouter, error) {
+ var boFunc func() backoff.Policy
+ if backOffPolicyFunc != nil {
+ boFunc = backOffPolicyFunc
+ } else {
+ boFunc = backoff.NewDefaultBackoff
+ }
r := &retryRouter{
- client: client,
- policy: policy,
- log: logger,
+ client: client,
+ policy: policy,
+ backOffPolicyFunc: boFunc,
+ log: logger,
}
if policy != nil && retryEnabled {
@@ -124,7 +134,7 @@ func (r *retryRouter) getProducer() Producer {
}
// Retry to create producer indefinitely
- backoff := &internal.DefaultBackoff{}
+ bo := r.backOffPolicyFunc()
for {
opt := r.policy.ProducerOptions
opt.Topic = r.policy.RetryLetterTopic
@@ -138,7 +148,7 @@ func (r *retryRouter) getProducer() Producer {
if err != nil {
r.log.WithError(err).Error("Failed to create RLQ
producer")
- time.Sleep(backoff.Next())
+ time.Sleep(bo.Next())
continue
} else {
r.producer = producer
diff --git a/pulsar/transaction_coordinator_client.go
b/pulsar/transaction_coordinator_client.go
index 1449d698..afde5427 100644
--- a/pulsar/transaction_coordinator_client.go
+++ b/pulsar/transaction_coordinator_client.go
@@ -24,6 +24,8 @@ import (
"sync/atomic"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/backoff"
+
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
@@ -143,7 +145,7 @@ func (t *transactionHandler) runEventsLoop() {
func (t *transactionHandler) reconnectToBroker() {
var delayReconnectTime time.Duration
- var defaultBackoff = internal.DefaultBackoff{}
+ var defaultBackoff = backoff.DefaultBackoff{}
for {
if t.getState() == txnHandlerClosed {