This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a8847f  Parameterize the reconnection option (#853)
6a8847f is described below

commit 6a8847f34dfdef27740ac619ff9aeb3fd1e85afb
Author: xiaolong ran <[email protected]>
AuthorDate: Thu Sep 29 10:40:24 2022 +0800

    Parameterize the reconnection option (#853)
---
 pulsar/consumer.go                |  6 ++++++
 pulsar/consumer_impl.go           |  1 +
 pulsar/consumer_partition.go      | 24 ++++++++++++++++--------
 pulsar/dlq_router.go              |  2 +-
 pulsar/internal/backoff.go        | 25 +++++++++++++++----------
 pulsar/internal/backoff_test.go   |  6 +++---
 pulsar/internal/http_client.go    |  2 +-
 pulsar/internal/rpc_client.go     |  2 +-
 pulsar/negative_backoff_policy.go |  2 +-
 pulsar/producer.go                |  6 ++++++
 pulsar/producer_partition.go      | 23 ++++++++++++++---------
 pulsar/retry_router.go            |  2 +-
 12 files changed, 66 insertions(+), 35 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 70d67ba..3f756f8 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -20,6 +20,8 @@ package pulsar
 import (
        "context"
        "time"
+
+       "github.com/apache/pulsar-client-go/pulsar/internal"
 )
 
 // ConsumerMessage represents a pair of a Consumer and Message.
@@ -171,6 +173,10 @@ type ConsumerOptions struct {
        // MaxReconnectToBroker sets the maximum retry number of 
reconnectToBroker. (default: ultimate)
        MaxReconnectToBroker *uint
 
+       // BackoffPolicy parameterize the following options in the reconnection 
logic to
+       // allow users to customize the reconnection logic (minBackoff, 
maxBackoff and jitterPercentage)
+       BackoffPolicy internal.BackoffPolicy
+
        // Decryption represents the encryption related fields required by the 
consumer to decrypt a message.
        Decryption *MessageDecryptionInfo
 
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index e6135bf..517824d 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -361,6 +361,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
                                readCompacted:              
c.options.ReadCompacted,
                                interceptors:               
c.options.Interceptors,
                                maxReconnectToBroker:       
c.options.MaxReconnectToBroker,
+                               backoffPolicy:              
c.options.BackoffPolicy,
                                keySharedPolicy:            
c.options.KeySharedPolicy,
                                schema:                     c.options.Schema,
                                decryption:                 
c.options.Decryption,
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index cc9e710..7ddff5e 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -102,6 +102,7 @@ type partitionConsumerOpts struct {
        disableForceTopicCreation  bool
        interceptors               ConsumerInterceptors
        maxReconnectToBroker       *uint
+       backoffPolicy              internal.BackoffPolicy
        keySharedPolicy            *KeySharedPolicy
        schema                     Schema
        decryption                 *MessageDecryptionInfo
@@ -1143,10 +1144,7 @@ func (pc *partitionConsumer) internalClose(req 
*closeRequest) {
 }
 
 func (pc *partitionConsumer) reconnectToBroker() {
-       var (
-               maxRetry int
-               backoff  = internal.Backoff{}
-       )
+       var maxRetry int
 
        if pc.options.maxReconnectToBroker == nil {
                maxRetry = -1
@@ -1161,9 +1159,19 @@ func (pc *partitionConsumer) reconnectToBroker() {
                        return
                }
 
-               d := backoff.Next()
-               pc.log.Info("Reconnecting to broker in ", d)
-               time.Sleep(d)
+               var (
+                       delayReconnectTime time.Duration
+                       defaultBackoff     = internal.DefaultBackoff{}
+               )
+
+               if pc.options.backoffPolicy == nil {
+                       delayReconnectTime = defaultBackoff.Next()
+               } else {
+                       delayReconnectTime = pc.options.backoffPolicy.Next()
+               }
+
+               pc.log.Info("Reconnecting to broker in ", delayReconnectTime)
+               time.Sleep(delayReconnectTime)
 
                err := pc.grabConn()
                if err == nil {
@@ -1183,7 +1191,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
                        maxRetry--
                }
                pc.metrics.ConsumersReconnectFailure.Inc()
-               if maxRetry == 0 || backoff.IsMaxBackoffReached() {
+               if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
                        pc.metrics.ConsumersReconnectMaxRetry.Inc()
                }
        }
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 966bff1..000faaa 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -133,7 +133,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
        }
 
        // Retry to create producer indefinitely
-       backoff := &internal.Backoff{}
+       backoff := &internal.DefaultBackoff{}
        for {
                opt := r.policy.ProducerOptions
                opt.Topic = r.policy.DeadLetterTopic
diff --git a/pulsar/internal/backoff.go b/pulsar/internal/backoff.go
index ff9b0bc..3284fb7 100644
--- a/pulsar/internal/backoff.go
+++ b/pulsar/internal/backoff.go
@@ -26,20 +26,25 @@ func init() {
        rand.Seed(time.Now().UnixNano())
 }
 
-// Backoff computes the delay before retrying an action.
+// BackoffPolicy parameterize the following options in the reconnection logic 
to
+// allow users to customize the reconnection logic (minBackoff, maxBackoff and 
jitterPercentage)
+type BackoffPolicy interface {
+       Next() time.Duration
+}
+
+// DefaultBackoff computes the delay before retrying an action.
 // It uses an exponential backoff with jitter. The jitter represents up to 20 
percents of the delay.
-type Backoff struct {
+type DefaultBackoff struct {
        backoff time.Duration
 }
 
-const (
-       minBackoff       = 100 * time.Millisecond
-       maxBackoff       = 60 * time.Second
-       jitterPercentage = 0.2
-)
+const maxBackoff = 60 * time.Second
 
 // Next returns the delay to wait before next retry
-func (b *Backoff) Next() time.Duration {
+func (b *DefaultBackoff) Next() time.Duration {
+       minBackoff := 100 * time.Millisecond
+       jitterPercentage := 0.2
+
        // Double the delay each time
        b.backoff += b.backoff
        if b.backoff.Nanoseconds() < minBackoff.Nanoseconds() {
@@ -52,7 +57,7 @@ func (b *Backoff) Next() time.Duration {
        return b.backoff + time.Duration(jitter)
 }
 
-// IsMaxBackReached evaluates if the max number of retries is reached
-func (b *Backoff) IsMaxBackoffReached() bool {
+// IsMaxBackoffReached evaluates if the max number of retries is reached
+func (b *DefaultBackoff) IsMaxBackoffReached() bool {
        return b.backoff >= maxBackoff
 }
diff --git a/pulsar/internal/backoff_test.go b/pulsar/internal/backoff_test.go
index ad6e764..e05ea29 100644
--- a/pulsar/internal/backoff_test.go
+++ b/pulsar/internal/backoff_test.go
@@ -25,14 +25,14 @@ import (
 )
 
 func TestBackoff_NextMinValue(t *testing.T) {
-       backoff := &Backoff{}
+       backoff := &DefaultBackoff{}
        delay := backoff.Next()
        assert.GreaterOrEqual(t, int64(delay), int64(100*time.Millisecond))
        assert.LessOrEqual(t, int64(delay), int64(120*time.Millisecond))
 }
 
 func TestBackoff_NextExponentialBackoff(t *testing.T) {
-       backoff := &Backoff{}
+       backoff := &DefaultBackoff{}
        previousDelay := backoff.Next()
        // the last value before capping to the max value is 51.2 s (.1, .2, 
.4, .8, 1.6, 3.2, 6.4, 12.8, 25.6, 51.2)
        for previousDelay < 51*time.Second {
@@ -47,7 +47,7 @@ func TestBackoff_NextExponentialBackoff(t *testing.T) {
 }
 
 func TestBackoff_NextMaxValue(t *testing.T) {
-       backoff := &Backoff{}
+       backoff := &DefaultBackoff{}
        var delay time.Duration
        for delay < maxBackoff {
                delay = backoff.Next()
diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go
index c160623..8c494dc 100644
--- a/pulsar/internal/http_client.go
+++ b/pulsar/internal/http_client.go
@@ -148,7 +148,7 @@ func (c *httpClient) Get(endpoint string, obj interface{}, 
params map[string]str
        if _, ok := err.(*url.Error); ok {
                // We can retry this kind of requests over a connection error 
because they're
                // not specific to a particular broker.
-               backoff := Backoff{100 * time.Millisecond}
+               backoff := DefaultBackoff{100 * time.Millisecond}
                startTime := time.Now()
                var retryTime time.Duration
 
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 24506ef..4ef2995 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -91,7 +91,7 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, 
cmdType pb.BaseCommand_
        var host *url.URL
        var rpcResult *RPCResult
        startTime := time.Now()
-       backoff := Backoff{100 * time.Millisecond}
+       backoff := DefaultBackoff{100 * time.Millisecond}
        // we can retry these requests because this kind of request is
        // not specific to any particular broker
        for time.Since(startTime) < c.requestTimeout {
diff --git a/pulsar/negative_backoff_policy.go 
b/pulsar/negative_backoff_policy.go
index be72bfa..5cd35bc 100644
--- a/pulsar/negative_backoff_policy.go
+++ b/pulsar/negative_backoff_policy.go
@@ -29,7 +29,7 @@ import (
 // > NackBackoffPolicy, which means the message might get redelivered earlier 
than the delay time
 // > from the backoff.
 type NackBackoffPolicy interface {
-       // The redeliveryCount indicates the number of times the message was 
redelivered.
+       // Next param redeliveryCount indicates the number of times the message 
was redelivered.
        // We can get the redeliveryCount from the CommandMessage.
        Next(redeliveryCount uint32) time.Duration
 }
diff --git a/pulsar/producer.go b/pulsar/producer.go
index fd68631..b4e43bd 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -20,6 +20,8 @@ package pulsar
 import (
        "context"
        "time"
+
+       "github.com/apache/pulsar-client-go/pulsar/internal"
 )
 
 type HashingScheme int
@@ -155,6 +157,10 @@ type ProducerOptions struct {
        // MaxReconnectToBroker specifies the maximum retry number of 
reconnectToBroker. (default: ultimate)
        MaxReconnectToBroker *uint
 
+       // BackoffPolicy parameterize the following options in the reconnection 
logic to
+       // allow users to customize the reconnection logic (minBackoff, 
maxBackoff and jitterPercentage)
+       BackoffPolicy internal.BackoffPolicy
+
        // BatcherBuilderType sets the batch builder type (default 
DefaultBatchBuilder)
        // This will be used to create batch container when batching is enabled.
        // Options:
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 45d2aa9..922d89d 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -380,11 +380,7 @@ func (p *partitionProducer) getOrCreateSchema(schemaInfo 
*SchemaInfo) (schemaVer
 }
 
 func (p *partitionProducer) reconnectToBroker() {
-       var (
-               maxRetry int
-               backoff  = internal.Backoff{}
-       )
-
+       var maxRetry int
        if p.options.MaxReconnectToBroker == nil {
                maxRetry = -1
        } else {
@@ -398,9 +394,18 @@ func (p *partitionProducer) reconnectToBroker() {
                        return
                }
 
-               d := backoff.Next()
-               p.log.Info("Reconnecting to broker in ", d)
-               time.Sleep(d)
+               var (
+                       delayReconnectTime time.Duration
+                       defaultBackoff     = internal.DefaultBackoff{}
+               )
+
+               if p.options.BackoffPolicy == nil {
+                       delayReconnectTime = defaultBackoff.Next()
+               } else {
+                       delayReconnectTime = p.options.BackoffPolicy.Next()
+               }
+               p.log.Info("Reconnecting to broker in ", delayReconnectTime)
+               time.Sleep(delayReconnectTime)
                atomic.AddUint64(&p.epoch, 1)
                err := p.grabCnx()
                if err == nil {
@@ -420,7 +425,7 @@ func (p *partitionProducer) reconnectToBroker() {
                        maxRetry--
                }
                p.metrics.ProducersReconnectFailure.Inc()
-               if maxRetry == 0 || backoff.IsMaxBackoffReached() {
+               if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
                        p.metrics.ProducersReconnectMaxRetry.Inc()
                }
        }
diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go
index 4d19ce2..7b5f6b8 100644
--- a/pulsar/retry_router.go
+++ b/pulsar/retry_router.go
@@ -123,7 +123,7 @@ func (r *retryRouter) getProducer() Producer {
        }
 
        // Retry to create producer indefinitely
-       backoff := &internal.Backoff{}
+       backoff := &internal.DefaultBackoff{}
        for {
                opt := r.policy.ProducerOptions
                opt.Topic = r.policy.RetryLetterTopic

Reply via email to