This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 630d5f82 [fix] Reconnection logic and Backoff policy doesn't work 
correctly (#1197)
630d5f82 is described below

commit 630d5f8218e00fe248d745c63ad0905527f45e66
Author: crossoverJie <[email protected]>
AuthorDate: Mon Sep 23 18:16:35 2024 +0800

    [fix] Reconnection logic and Backoff policy doesn't work correctly (#1197)
    
    Fixes #1187
    
    ### Modifications
    
    - Move `backoff.go` to the `backoff` directory (because there are circular 
dependencies, they are not moved to the pulsar directory.)
    - Create a new method for `BackOffPolicy` interface 
`IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime time.Duration) 
bool`
    
    This is a **breaking change** that modifies the package name and interface 
name.
    
    Package: `internal`->`backoff`
    Interface name: `BackoffPolicy`-> `Policy`
    
    
    ---------
    
    Co-authored-by: Zixuan Liu <[email protected]>
    Co-authored-by: Zike Yang <[email protected]>
---
 pulsar/{internal => backoff}/backoff.go      | 24 ++++++++++++--
 pulsar/{internal => backoff}/backoff_test.go |  4 ++-
 pulsar/blue_green_migration_test.go          | 15 +++++----
 pulsar/consumer.go                           |  6 ++--
 pulsar/consumer_impl.go                      |  7 +++--
 pulsar/consumer_partition.go                 | 41 +++++++++++++-----------
 pulsar/consumer_regex_test.go                | 10 +++---
 pulsar/consumer_test.go                      | 16 ++++++----
 pulsar/dlq_router.go                         | 47 +++++++++++++++++-----------
 pulsar/internal/http_client.go               |  6 ++--
 pulsar/internal/rpc_client.go                |  6 ++--
 pulsar/producer.go                           |  6 ++--
 pulsar/producer_partition.go                 | 47 +++++++++++++++++-----------
 pulsar/producer_test.go                      | 20 +++++++-----
 pulsar/reader.go                             |  6 ++--
 pulsar/reader_impl.go                        |  7 +++--
 pulsar/reader_test.go                        | 23 ++++++++++----
 pulsar/retry_router.go                       | 36 +++++++++++++--------
 pulsar/transaction_coordinator_client.go     |  4 ++-
 19 files changed, 207 insertions(+), 124 deletions(-)

diff --git a/pulsar/internal/backoff.go b/pulsar/backoff/backoff.go
similarity index 77%
rename from pulsar/internal/backoff.go
rename to pulsar/backoff/backoff.go
index 3284fb7e..453da578 100644
--- a/pulsar/internal/backoff.go
+++ b/pulsar/backoff/backoff.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package internal
+package backoff
 
 import (
        "math/rand"
@@ -26,10 +26,17 @@ func init() {
        rand.Seed(time.Now().UnixNano())
 }
 
-// BackoffPolicy parameterize the following options in the reconnection logic 
to
+// Policy parameterize the following options in the reconnection logic to
 // allow users to customize the reconnection logic (minBackoff, maxBackoff and 
jitterPercentage)
-type BackoffPolicy interface {
+type Policy interface {
+       // Next returns the delay to wait before next retry
        Next() time.Duration
+
+       // IsMaxBackoffReached evaluates if the max number of retries is reached
+       IsMaxBackoffReached() bool
+
+       // Reset the backoff to the initial state
+       Reset()
 }
 
 // DefaultBackoff computes the delay before retrying an action.
@@ -38,6 +45,13 @@ type DefaultBackoff struct {
        backoff time.Duration
 }
 
+func NewDefaultBackoff() Policy {
+       return &DefaultBackoff{}
+}
+func NewDefaultBackoffWithInitialBackOff(backoff time.Duration) Policy {
+       return &DefaultBackoff{backoff: backoff / 2}
+}
+
 const maxBackoff = 60 * time.Second
 
 // Next returns the delay to wait before next retry
@@ -61,3 +75,7 @@ func (b *DefaultBackoff) Next() time.Duration {
 func (b *DefaultBackoff) IsMaxBackoffReached() bool {
        return b.backoff >= maxBackoff
 }
+
+func (b *DefaultBackoff) Reset() {
+       b.backoff = 0
+}
diff --git a/pulsar/internal/backoff_test.go b/pulsar/backoff/backoff_test.go
similarity index 96%
rename from pulsar/internal/backoff_test.go
rename to pulsar/backoff/backoff_test.go
index e05ea292..fc0a4923 100644
--- a/pulsar/internal/backoff_test.go
+++ b/pulsar/backoff/backoff_test.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package internal
+package backoff
 
 import (
        "testing"
@@ -58,4 +58,6 @@ func TestBackoff_NextMaxValue(t *testing.T) {
        assert.Equal(t, true, backoff.IsMaxBackoffReached())
        // max value is 60 seconds + 20% jitter = 72 seconds
        assert.LessOrEqual(t, int64(cappedDelay), int64(72*time.Second))
+       backoff.Reset()
+       assert.Equal(t, false, backoff.IsMaxBackoffReached())
 }
diff --git a/pulsar/blue_green_migration_test.go 
b/pulsar/blue_green_migration_test.go
index 91667e8d..672ef343 100644
--- a/pulsar/blue_green_migration_test.go
+++ b/pulsar/blue_green_migration_test.go
@@ -54,8 +54,8 @@ func (suite *BlueGreenMigrationTestSuite) 
TestTopicMigration() {
        for _, scenario := range []topicUnloadTestCase{
 
                {
-                       testCaseName: "proxyConnection",
-                       blueAdminURL: "http://localhost:8080";,
+                       testCaseName:  "proxyConnection",
+                       blueAdminURL:  "http://localhost:8080";,
                        blueClientUrl: "pulsar://localhost:6650",
                        greenAdminURL: "http://localhost:8081";,
                        migrationBody: `
@@ -83,17 +83,17 @@ func testTopicMigrate(
        migrationBody string) {
        runtime.GOMAXPROCS(1)
        const (
-               cluster = "cluster-a"
+               cluster   = "cluster-a"
                tenant    = utils.PUBLICTENANT
                namespace = utils.DEFAULTNAMESPACE
 
-               blueBroker1URL = "pulsar://broker-1:6650"
-               blueBroker2URL = "pulsar://broker-2:6650"
+               blueBroker1URL  = "pulsar://broker-1:6650"
+               blueBroker2URL  = "pulsar://broker-2:6650"
                greenBroker1URL = "pulsar://green-broker-1:6650"
                greenBroker2URL = "pulsar://green-broker-2:6650"
 
-               blueBroker1LookupURL = "broker-1:8080"
-               blueBroker2LookupURL = "broker-2:8080"
+               blueBroker1LookupURL  = "broker-1:8080"
+               blueBroker2LookupURL  = "broker-2:8080"
                greenBroker1LookupURL = "green-broker-1:8080"
                greenBroker2LookupURL = "green-broker-2:8080"
        )
@@ -234,7 +234,6 @@ func testTopicMigrate(
        req.NoError(err)
        req.NotEmpty(bundleRange)
 
-
        unloadURL := fmt.Sprintf(
                "/admin/v2/namespaces/%s/%s/%s/unload?destinationBroker=%s",
                tenant, namespace, bundleRange, dstTopicBrokerLookupURL)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index bf2eafbf..880cad56 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -21,7 +21,7 @@ import (
        "context"
        "time"
 
-       "github.com/apache/pulsar-client-go/pulsar/internal"
+       "github.com/apache/pulsar-client-go/pulsar/backoff"
 )
 
 // ConsumerMessage represents a pair of a Consumer and Message.
@@ -207,9 +207,9 @@ type ConsumerOptions struct {
        // MaxReconnectToBroker sets the maximum retry number of 
reconnectToBroker. (default: ultimate)
        MaxReconnectToBroker *uint
 
-       // BackoffPolicy parameterize the following options in the reconnection 
logic to
+       // BackOffPolicyFunc parameterize the following options in the 
reconnection logic to
        // allow users to customize the reconnection logic (minBackoff, 
maxBackoff and jitterPercentage)
-       BackoffPolicy internal.BackoffPolicy
+       BackOffPolicyFunc func() backoff.Policy
 
        // Decryption represents the encryption related fields required by the 
consumer to decrypt a message.
        Decryption *MessageDecryptionInfo
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 679054a0..a3d3e3ff 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -174,11 +174,12 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                }
        }
 
-       dlq, err := newDlqRouter(client, options.DLQ, options.Topic, 
options.SubscriptionName, options.Name, client.log)
+       dlq, err := newDlqRouter(client, options.DLQ, options.Topic, 
options.SubscriptionName, options.Name,
+               options.BackOffPolicyFunc, client.log)
        if err != nil {
                return nil, err
        }
-       rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable, 
client.log)
+       rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable, 
options.BackOffPolicyFunc, client.log)
        if err != nil {
                return nil, err
        }
@@ -453,7 +454,7 @@ func newPartitionConsumerOpts(topic, consumerName string, 
idx int, options Consu
                readCompacted:               options.ReadCompacted,
                interceptors:                options.Interceptors,
                maxReconnectToBroker:        options.MaxReconnectToBroker,
-               backoffPolicy:               options.BackoffPolicy,
+               backOffPolicyFunc:           options.BackOffPolicyFunc,
                keySharedPolicy:             options.KeySharedPolicy,
                schema:                      options.Schema,
                decryption:                  options.Decryption,
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index f307972c..16aef5d4 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -26,6 +26,8 @@ import (
        "sync"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/backoff"
+
        "google.golang.org/protobuf/proto"
 
        "github.com/apache/pulsar-client-go/pulsar/crypto"
@@ -110,7 +112,7 @@ type partitionConsumerOpts struct {
        disableForceTopicCreation   bool
        interceptors                ConsumerInterceptors
        maxReconnectToBroker        *uint
-       backoffPolicy               internal.BackoffPolicy
+       backOffPolicyFunc           func() backoff.Policy
        keySharedPolicy             *KeySharedPolicy
        schema                      Schema
        decryption                  *MessageDecryptionInfo
@@ -182,6 +184,7 @@ type partitionConsumer struct {
        lastMessageInBroker *trackingMessageID
 
        redirectedClusterURI string
+       backoffPolicyFunc    func() backoff.Policy
 }
 
 func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
@@ -318,6 +321,13 @@ func (s *schemaInfoCache) add(schemaVersionHash string, 
schema Schema) {
 func newPartitionConsumer(parent Consumer, client *client, options 
*partitionConsumerOpts,
        messageCh chan ConsumerMessage, dlq *dlqRouter,
        metrics *internal.LeveledMetrics) (*partitionConsumer, error) {
+       var boFunc func() backoff.Policy
+       if options.backOffPolicyFunc != nil {
+               boFunc = options.backOffPolicyFunc
+       } else {
+               boFunc = backoff.NewDefaultBackoff
+       }
+
        pc := &partitionConsumer{
                parentConsumer:       parent,
                client:               client,
@@ -339,6 +349,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
                dlq:                  dlq,
                metrics:              metrics,
                schemaInfoCache:      newSchemaInfoCache(client, options.topic),
+               backoffPolicyFunc:    boFunc,
        }
        if pc.options.autoReceiverQueueSize {
                pc.currentQueueSize.Store(initialReceiverQueueSize)
@@ -581,12 +592,7 @@ func (pc *partitionConsumer) getLastMessageID() 
(*trackingMessageID, error) {
                return nil, errors.New("failed to getLastMessageID for the 
closing or closed consumer")
        }
        remainTime := pc.client.operationTimeout
-       var backoff internal.BackoffPolicy
-       if pc.options.backoffPolicy != nil {
-               backoff = pc.options.backoffPolicy
-       } else {
-               backoff = &internal.DefaultBackoff{}
-       }
+       bo := pc.backoffPolicyFunc()
        request := func() (*trackingMessageID, error) {
                req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
                pc.eventsCh <- req
@@ -604,7 +610,7 @@ func (pc *partitionConsumer) getLastMessageID() 
(*trackingMessageID, error) {
                        pc.log.WithError(err).Error("Failed to 
getLastMessageID")
                        return nil, fmt.Errorf("failed to getLastMessageID due 
to %w", err)
                }
-               nextDelay := backoff.Next()
+               nextDelay := bo.Next()
                if nextDelay > remainTime {
                        nextDelay = remainTime
                }
@@ -1684,18 +1690,17 @@ func (pc *partitionConsumer) internalClose(req 
*closeRequest) {
 }
 
 func (pc *partitionConsumer) reconnectToBroker(connectionClosed 
*connectionClosed) {
-       var maxRetry int
+       var (
+               maxRetry                                    int
+               delayReconnectTime, totalDelayReconnectTime time.Duration
+       )
 
        if pc.options.maxReconnectToBroker == nil {
                maxRetry = -1
        } else {
                maxRetry = int(*pc.options.maxReconnectToBroker)
        }
-
-       var (
-               delayReconnectTime time.Duration
-               defaultBackoff     = internal.DefaultBackoff{}
-       )
+       bo := pc.backoffPolicyFunc()
 
        for maxRetry != 0 {
                if pc.getConsumerState() != consumerReady {
@@ -1710,11 +1715,10 @@ func (pc *partitionConsumer) 
reconnectToBroker(connectionClosed *connectionClose
                        delayReconnectTime = 0
                        assignedBrokerURL = connectionClosed.assignedBrokerURL
                        connectionClosed = nil // Attempt connecting to the 
assigned broker just once
-               } else if pc.options.backoffPolicy == nil {
-                       delayReconnectTime = defaultBackoff.Next()
                } else {
-                       delayReconnectTime = pc.options.backoffPolicy.Next()
+                       delayReconnectTime = bo.Next()
                }
+               totalDelayReconnectTime += delayReconnectTime
 
                pc.log.WithFields(log.Fields{
                        "assignedBrokerURL":  assignedBrokerURL,
@@ -1733,6 +1737,7 @@ func (pc *partitionConsumer) 
reconnectToBroker(connectionClosed *connectionClose
                if err == nil {
                        // Successfully reconnected
                        pc.log.Info("Reconnected consumer to broker")
+                       bo.Reset()
                        return
                }
                pc.log.WithError(err).Error("Failed to create consumer at 
reconnect")
@@ -1747,7 +1752,7 @@ func (pc *partitionConsumer) 
reconnectToBroker(connectionClosed *connectionClose
                        maxRetry--
                }
                pc.metrics.ConsumersReconnectFailure.Inc()
-               if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
+               if maxRetry == 0 || bo.IsMaxBackoffReached() {
                        pc.metrics.ConsumersReconnectMaxRetry.Inc()
                }
        }
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index 28ba4f72..e1e2ca29 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -159,8 +159,9 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c 
Client, namespace string
                Name:                "regex-consumer",
        }
 
-       dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", 
"regex-consumer", log.DefaultNopLogger())
-       rlq, _ := newRetryRouter(c.(*client), nil, false, 
log.DefaultNopLogger())
+       dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", 
"regex-consumer",
+               nil, log.DefaultNopLogger())
+       rlq, _ := newRetryRouter(c.(*client), nil, false, nil, 
log.DefaultNopLogger())
        consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, 
make(chan ConsumerMessage, 1), dlq, rlq)
        if err != nil {
                t.Fatal(err)
@@ -198,8 +199,9 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c 
Client, namespace string
                Name:                "regex-consumer",
        }
 
-       dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", 
"regex-consumer", log.DefaultNopLogger())
-       rlq, _ := newRetryRouter(c.(*client), nil, false, 
log.DefaultNopLogger())
+       dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", 
"regex-consumer",
+               nil, log.DefaultNopLogger())
+       rlq, _ := newRetryRouter(c.(*client), nil, false, nil, 
log.DefaultNopLogger())
        consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, 
make(chan ConsumerMessage, 1), dlq, rlq)
        if err != nil {
                t.Fatal(err)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 04439cbc..a4a8d995 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -30,6 +30,8 @@ import (
        "testing"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/backoff"
+
        "github.com/apache/pulsar-client-go/pulsaradmin"
        "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
        "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
@@ -3874,12 +3876,14 @@ func TestConsumerWithBackoffPolicy(t *testing.T) {
 
        topicName := newTopicName()
 
-       backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second)
+       bo := newTestBackoffPolicy(1*time.Second, 4*time.Second)
        _consumer, err := client.Subscribe(ConsumerOptions{
                Topic:            topicName,
                SubscriptionName: "sub-1",
                Type:             Shared,
-               BackoffPolicy:    backoff,
+               BackOffPolicyFunc: func() backoff.Policy {
+                       return bo
+               },
        })
        assert.Nil(t, err)
        defer _consumer.Close()
@@ -3888,22 +3892,22 @@ func TestConsumerWithBackoffPolicy(t *testing.T) {
        // 1 s
        startTime := time.Now()
        partitionConsumerImp.reconnectToBroker(nil)
-       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+       assert.True(t, bo.IsExpectedIntervalFrom(startTime))
 
        // 2 s
        startTime = time.Now()
        partitionConsumerImp.reconnectToBroker(nil)
-       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+       assert.True(t, bo.IsExpectedIntervalFrom(startTime))
 
        // 4 s
        startTime = time.Now()
        partitionConsumerImp.reconnectToBroker(nil)
-       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+       assert.True(t, bo.IsExpectedIntervalFrom(startTime))
 
        // 4 s
        startTime = time.Now()
        partitionConsumerImp.reconnectToBroker(nil)
-       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+       assert.True(t, bo.IsExpectedIntervalFrom(startTime))
 }
 
 func TestAckWithMessageID(t *testing.T) {
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 647c022d..6b13b329 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -22,31 +22,40 @@ import (
        "fmt"
        "time"
 
-       "github.com/apache/pulsar-client-go/pulsar/internal"
+       "github.com/apache/pulsar-client-go/pulsar/backoff"
+
        "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
 type dlqRouter struct {
-       client           Client
-       producer         Producer
-       policy           *DLQPolicy
-       messageCh        chan ConsumerMessage
-       closeCh          chan interface{}
-       topicName        string
-       subscriptionName string
-       consumerName     string
-       log              log.Logger
+       client            Client
+       producer          Producer
+       policy            *DLQPolicy
+       messageCh         chan ConsumerMessage
+       closeCh           chan interface{}
+       topicName         string
+       subscriptionName  string
+       consumerName      string
+       backOffPolicyFunc func() backoff.Policy
+       log               log.Logger
 }
 
 func newDlqRouter(client Client, policy *DLQPolicy, topicName, 
subscriptionName, consumerName string,
-       logger log.Logger) (*dlqRouter, error) {
+       backOffPolicyFunc func() backoff.Policy, logger log.Logger) 
(*dlqRouter, error) {
+       var boFunc func() backoff.Policy
+       if backOffPolicyFunc != nil {
+               boFunc = backOffPolicyFunc
+       } else {
+               boFunc = backoff.NewDefaultBackoff
+       }
        r := &dlqRouter{
-               client:           client,
-               policy:           policy,
-               topicName:        topicName,
-               subscriptionName: subscriptionName,
-               consumerName:     consumerName,
-               log:              logger,
+               client:            client,
+               policy:            policy,
+               topicName:         topicName,
+               subscriptionName:  subscriptionName,
+               consumerName:      consumerName,
+               backOffPolicyFunc: boFunc,
+               log:               logger,
        }
 
        if policy != nil {
@@ -155,7 +164,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
        }
 
        // Retry to create producer indefinitely
-       backoff := &internal.DefaultBackoff{}
+       bo := r.backOffPolicyFunc()
        for {
                opt := r.policy.ProducerOptions
                opt.Topic = r.policy.DeadLetterTopic
@@ -174,7 +183,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
 
                if err != nil {
                        r.log.WithError(err).Error("Failed to create DLQ 
producer")
-                       time.Sleep(backoff.Next())
+                       time.Sleep(bo.Next())
                        continue
                } else {
                        r.producer = producer
diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go
index e68bd17c..eea0101a 100644
--- a/pulsar/internal/http_client.go
+++ b/pulsar/internal/http_client.go
@@ -29,6 +29,8 @@ import (
        "path"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/backoff"
+
        "github.com/apache/pulsar-client-go/pulsar/auth"
 
        "github.com/apache/pulsar-client-go/pulsar/log"
@@ -148,12 +150,12 @@ func (c *httpClient) Get(endpoint string, obj 
interface{}, params map[string]str
        if _, ok := err.(*url.Error); ok {
                // We can retry this kind of requests over a connection error 
because they're
                // not specific to a particular broker.
-               backoff := DefaultBackoff{100 * time.Millisecond}
+               bo := backoff.NewDefaultBackoffWithInitialBackOff(100 * 
time.Millisecond)
                startTime := time.Now()
                var retryTime time.Duration
 
                for time.Since(startTime) < c.requestTimeout {
-                       retryTime = backoff.Next()
+                       retryTime = bo.Next()
                        c.log.Debugf("Retrying httpRequest in {%v} with timeout 
in {%v}", retryTime, c.requestTimeout)
                        time.Sleep(retryTime)
                        _, err = c.GetWithQueryParams(endpoint, obj, params, 
true)
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index d2e3895e..0f993116 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -25,6 +25,8 @@ import (
        "sync/atomic"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/backoff"
+
        "github.com/apache/pulsar-client-go/pulsar/auth"
        "github.com/apache/pulsar-client-go/pulsar/log"
 
@@ -115,7 +117,7 @@ func (c *rpcClient) requestToHost(serviceNameResolver 
*ServiceNameResolver,
        var host *url.URL
        var rpcResult *RPCResult
        startTime := time.Now()
-       backoff := DefaultBackoff{100 * time.Millisecond}
+       bo := backoff.NewDefaultBackoffWithInitialBackOff(100 * 
time.Millisecond)
        // we can retry these requests because this kind of request is
        // not specific to any particular broker
        for time.Since(startTime) < c.requestTimeout {
@@ -130,7 +132,7 @@ func (c *rpcClient) requestToHost(serviceNameResolver 
*ServiceNameResolver,
                        break
                }
 
-               retryTime := backoff.Next()
+               retryTime := bo.Next()
                c.log.Debugf("Retrying request in {%v} with timeout in {%v}", 
retryTime, c.requestTimeout)
                time.Sleep(retryTime)
        }
diff --git a/pulsar/producer.go b/pulsar/producer.go
index 0ae51bd4..997f9c0d 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -21,7 +21,7 @@ import (
        "context"
        "time"
 
-       "github.com/apache/pulsar-client-go/pulsar/internal"
+       "github.com/apache/pulsar-client-go/pulsar/backoff"
 )
 
 type HashingScheme int
@@ -171,9 +171,9 @@ type ProducerOptions struct {
        // MaxReconnectToBroker specifies the maximum retry number of 
reconnectToBroker. (default: ultimate)
        MaxReconnectToBroker *uint
 
-       // BackoffPolicy parameterize the following options in the reconnection 
logic to
+       // BackOffPolicyFunc parameterize the following options in the 
reconnection logic to
        // allow users to customize the reconnection logic (minBackoff, 
maxBackoff and jitterPercentage)
-       BackoffPolicy internal.BackoffPolicy
+       BackOffPolicyFunc func() backoff.Policy
 
        // BatcherBuilderType sets the batch builder type (default 
DefaultBatchBuilder)
        // This will be used to create batch container when batching is enabled.
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index af5fb38a..f578ee4b 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -28,6 +28,8 @@ import (
        "sync/atomic"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/backoff"
+
        "github.com/apache/pulsar-client-go/pulsar/internal/compression"
        internalcrypto 
"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
 
@@ -121,6 +123,7 @@ type partitionProducer struct {
        redirectedClusterURI string
        ctx                  context.Context
        cancelFunc           context.CancelFunc
+       backOffPolicyFunc    func() backoff.Policy
 }
 
 type schemaCache struct {
@@ -147,6 +150,14 @@ func (s *schemaCache) Get(schema *SchemaInfo) 
(schemaVersion []byte) {
 func newPartitionProducer(client *client, topic string, options 
*ProducerOptions, partitionIdx int,
        metrics *internal.LeveledMetrics) (
        *partitionProducer, error) {
+
+       var boFunc func() backoff.Policy
+       if options.BackOffPolicyFunc != nil {
+               boFunc = options.BackOffPolicyFunc
+       } else {
+               boFunc = backoff.NewDefaultBackoff
+       }
+
        var batchingMaxPublishDelay time.Duration
        if options.BatchingMaxPublishDelay != 0 {
                batchingMaxPublishDelay = options.BatchingMaxPublishDelay
@@ -176,15 +187,16 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
                batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
                compressionProvider: 
internal.GetCompressionProvider(pb.CompressionType(options.CompressionType),
                        compression.Level(options.CompressionLevel)),
-               publishSemaphore: 
internal.NewSemaphore(int32(maxPendingMessages)),
-               pendingQueue:     internal.NewBlockingQueue(maxPendingMessages),
-               lastSequenceID:   -1,
-               partitionIdx:     int32(partitionIdx),
-               metrics:          metrics,
-               epoch:            0,
-               schemaCache:      newSchemaCache(),
-               ctx:              ctx,
-               cancelFunc:       cancelFunc,
+               publishSemaphore:  
internal.NewSemaphore(int32(maxPendingMessages)),
+               pendingQueue:      
internal.NewBlockingQueue(maxPendingMessages),
+               lastSequenceID:    -1,
+               partitionIdx:      int32(partitionIdx),
+               metrics:           metrics,
+               epoch:             0,
+               schemaCache:       newSchemaCache(),
+               ctx:               ctx,
+               cancelFunc:        cancelFunc,
+               backOffPolicyFunc: boFunc,
        }
        if p.options.DisableBatching {
                p.batchFlushTicker.Stop()
@@ -458,17 +470,17 @@ func (p *partitionProducer) getOrCreateSchema(schemaInfo 
*SchemaInfo) (schemaVer
 }
 
 func (p *partitionProducer) reconnectToBroker(connectionClosed 
*connectionClosed) {
-       var maxRetry int
+       var (
+               maxRetry           int
+               delayReconnectTime time.Duration
+       )
        if p.options.MaxReconnectToBroker == nil {
                maxRetry = -1
        } else {
                maxRetry = int(*p.options.MaxReconnectToBroker)
        }
 
-       var (
-               delayReconnectTime time.Duration
-               defaultBackoff     = internal.DefaultBackoff{}
-       )
+       bo := p.backOffPolicyFunc()
 
        for maxRetry != 0 {
                select {
@@ -489,10 +501,8 @@ func (p *partitionProducer) 
reconnectToBroker(connectionClosed *connectionClosed
                        delayReconnectTime = 0
                        assignedBrokerURL = connectionClosed.assignedBrokerURL
                        connectionClosed = nil // Only attempt once
-               } else if p.options.BackoffPolicy == nil {
-                       delayReconnectTime = defaultBackoff.Next()
                } else {
-                       delayReconnectTime = p.options.BackoffPolicy.Next()
+                       delayReconnectTime = bo.Next()
                }
 
                p.log.WithFields(log.Fields{
@@ -513,6 +523,7 @@ func (p *partitionProducer) 
reconnectToBroker(connectionClosed *connectionClosed
                if err == nil {
                        // Successfully reconnected
                        p.log.WithField("cnx", 
p._getConn().ID()).Info("Reconnected producer to broker")
+                       bo.Reset()
                        return
                }
                p.log.WithError(err).Error("Failed to create producer at 
reconnect")
@@ -546,7 +557,7 @@ func (p *partitionProducer) 
reconnectToBroker(connectionClosed *connectionClosed
                        maxRetry--
                }
                p.metrics.ProducersReconnectFailure.Inc()
-               if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
+               if maxRetry == 0 || bo.IsMaxBackoffReached() {
                        p.metrics.ProducersReconnectMaxRetry.Inc()
                }
        }
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 5b23182d..afd1f09a 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -34,6 +34,8 @@ import (
        "github.com/testcontainers/testcontainers-go"
        "github.com/testcontainers/testcontainers-go/wait"
 
+       "github.com/apache/pulsar-client-go/pulsar/backoff"
+
        "github.com/stretchr/testify/assert"
        "google.golang.org/protobuf/proto"
 
@@ -1293,11 +1295,13 @@ func TestProducerWithBackoffPolicy(t *testing.T) {
 
        topicName := newTopicName()
 
-       backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second)
+       bo := newTestBackoffPolicy(1*time.Second, 4*time.Second)
        _producer, err := client.CreateProducer(ProducerOptions{
-               Topic:         topicName,
-               SendTimeout:   2 * time.Second,
-               BackoffPolicy: backoff,
+               Topic:       topicName,
+               SendTimeout: 2 * time.Second,
+               BackOffPolicyFunc: func() backoff.Policy {
+                       return bo
+               },
        })
        assert.Nil(t, err)
        defer _producer.Close()
@@ -1306,22 +1310,22 @@ func TestProducerWithBackoffPolicy(t *testing.T) {
        // 1 s
        startTime := time.Now()
        partitionProducerImp.reconnectToBroker(nil)
-       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+       assert.True(t, bo.IsExpectedIntervalFrom(startTime))
 
        // 2 s
        startTime = time.Now()
        partitionProducerImp.reconnectToBroker(nil)
-       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+       assert.True(t, bo.IsExpectedIntervalFrom(startTime))
 
        // 4 s
        startTime = time.Now()
        partitionProducerImp.reconnectToBroker(nil)
-       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+       assert.True(t, bo.IsExpectedIntervalFrom(startTime))
 
        // 4 s
        startTime = time.Now()
        partitionProducerImp.reconnectToBroker(nil)
-       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+       assert.True(t, bo.IsExpectedIntervalFrom(startTime))
 }
 
 func TestSendContextExpired(t *testing.T) {
diff --git a/pulsar/reader.go b/pulsar/reader.go
index 4daa8890..98bde4e3 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -21,7 +21,7 @@ import (
        "context"
        "time"
 
-       "github.com/apache/pulsar-client-go/pulsar/internal"
+       "github.com/apache/pulsar-client-go/pulsar/backoff"
 )
 
 // ReaderMessage packages Reader and Message as a struct to use.
@@ -89,9 +89,9 @@ type ReaderOptions struct {
        // Schema represents the schema implementation.
        Schema Schema
 
-       // BackoffPolicy parameterize the following options in the reconnection 
logic to
+       // BackoffPolicyFunc parameterize the following options in the 
reconnection logic to
        // allow users to customize the reconnection logic (minBackoff, 
maxBackoff and jitterPercentage)
-       BackoffPolicy internal.BackoffPolicy
+       BackoffPolicyFunc func() backoff.Policy
 
        // MaxPendingChunkedMessage sets the maximum pending chunked messages. 
(default: 100)
        MaxPendingChunkedMessage int
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 7cbae05c..f76255e2 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -112,7 +112,7 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
                ReplicateSubscriptionState:  false,
                Decryption:                  options.Decryption,
                Schema:                      options.Schema,
-               BackoffPolicy:               options.BackoffPolicy,
+               BackOffPolicyFunc:           options.BackoffPolicyFunc,
                MaxPendingChunkedMessage:    options.MaxPendingChunkedMessage,
                ExpireTimeOfIncompleteChunk: 
options.ExpireTimeOfIncompleteChunk,
                AutoAckIncompleteChunk:      options.AutoAckIncompleteChunk,
@@ -128,12 +128,13 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
        }
 
        // Provide dummy dlq router with not dlq policy
-       dlq, err := newDlqRouter(client, nil, options.Topic, 
options.SubscriptionName, options.Name, client.log)
+       dlq, err := newDlqRouter(client, nil, options.Topic, 
options.SubscriptionName, options.Name,
+               options.BackoffPolicyFunc, client.log)
        if err != nil {
                return nil, err
        }
        // Provide dummy rlq router with not dlq policy
-       rlq, err := newRetryRouter(client, nil, false, client.log)
+       rlq, err := newRetryRouter(client, nil, false, 
options.BackoffPolicyFunc, client.log)
        if err != nil {
                return nil, err
        }
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index d00346fc..3c928c1d 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -23,6 +23,8 @@ import (
        "testing"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/backoff"
+
        "github.com/apache/pulsar-client-go/pulsar/crypto"
        "github.com/apache/pulsar-client-go/pulsaradmin"
        "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
@@ -847,6 +849,13 @@ func (b *testBackoffPolicy) Next() time.Duration {
 
        return b.curBackoff
 }
+func (b *testBackoffPolicy) IsMaxBackoffReached() bool {
+       return false
+}
+
+func (b *testBackoffPolicy) Reset() {
+
+}
 
 func (b *testBackoffPolicy) IsExpectedIntervalFrom(startTime time.Time) bool {
        // Approximately equal to expected interval
@@ -866,11 +875,13 @@ func TestReaderWithBackoffPolicy(t *testing.T) {
        assert.Nil(t, err)
        defer client.Close()
 
-       backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second)
+       bo := newTestBackoffPolicy(1*time.Second, 4*time.Second)
        _reader, err := client.CreateReader(ReaderOptions{
                Topic:          "my-topic",
                StartMessageID: LatestMessageID(),
-               BackoffPolicy:  backoff,
+               BackoffPolicyFunc: func() backoff.Policy {
+                       return bo
+               },
        })
        assert.NotNil(t, _reader)
        assert.Nil(t, err)
@@ -879,22 +890,22 @@ func TestReaderWithBackoffPolicy(t *testing.T) {
        // 1 s
        startTime := time.Now()
        partitionConsumerImp.reconnectToBroker(nil)
-       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+       assert.True(t, bo.IsExpectedIntervalFrom(startTime))
 
        // 2 s
        startTime = time.Now()
        partitionConsumerImp.reconnectToBroker(nil)
-       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+       assert.True(t, bo.IsExpectedIntervalFrom(startTime))
 
        // 4 s
        startTime = time.Now()
        partitionConsumerImp.reconnectToBroker(nil)
-       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+       assert.True(t, bo.IsExpectedIntervalFrom(startTime))
 
        // 4 s
        startTime = time.Now()
        partitionConsumerImp.reconnectToBroker(nil)
-       assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+       assert.True(t, bo.IsExpectedIntervalFrom(startTime))
 }
 
 func TestReaderGetLastMessageID(t *testing.T) {
diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go
index 75792adc..c8aa0b94 100644
--- a/pulsar/retry_router.go
+++ b/pulsar/retry_router.go
@@ -21,7 +21,8 @@ import (
        "context"
        "time"
 
-       "github.com/apache/pulsar-client-go/pulsar/internal"
+       "github.com/apache/pulsar-client-go/pulsar/backoff"
+
        "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
@@ -44,19 +45,28 @@ type RetryMessage struct {
 }
 
 type retryRouter struct {
-       client    Client
-       producer  Producer
-       policy    *DLQPolicy
-       messageCh chan RetryMessage
-       closeCh   chan interface{}
-       log       log.Logger
+       client            Client
+       producer          Producer
+       policy            *DLQPolicy
+       messageCh         chan RetryMessage
+       closeCh           chan interface{}
+       backOffPolicyFunc func() backoff.Policy
+       log               log.Logger
 }
 
-func newRetryRouter(client Client, policy *DLQPolicy, retryEnabled bool, 
logger log.Logger) (*retryRouter, error) {
+func newRetryRouter(client Client, policy *DLQPolicy, retryEnabled bool, 
backOffPolicyFunc func() backoff.Policy,
+       logger log.Logger) (*retryRouter, error) {
+       var boFunc func() backoff.Policy
+       if backOffPolicyFunc != nil {
+               boFunc = backOffPolicyFunc
+       } else {
+               boFunc = backoff.NewDefaultBackoff
+       }
        r := &retryRouter{
-               client: client,
-               policy: policy,
-               log:    logger,
+               client:            client,
+               policy:            policy,
+               backOffPolicyFunc: boFunc,
+               log:               logger,
        }
 
        if policy != nil && retryEnabled {
@@ -124,7 +134,7 @@ func (r *retryRouter) getProducer() Producer {
        }
 
        // Retry to create producer indefinitely
-       backoff := &internal.DefaultBackoff{}
+       bo := r.backOffPolicyFunc()
        for {
                opt := r.policy.ProducerOptions
                opt.Topic = r.policy.RetryLetterTopic
@@ -138,7 +148,7 @@ func (r *retryRouter) getProducer() Producer {
 
                if err != nil {
                        r.log.WithError(err).Error("Failed to create RLQ 
producer")
-                       time.Sleep(backoff.Next())
+                       time.Sleep(bo.Next())
                        continue
                } else {
                        r.producer = producer
diff --git a/pulsar/transaction_coordinator_client.go 
b/pulsar/transaction_coordinator_client.go
index 1449d698..afde5427 100644
--- a/pulsar/transaction_coordinator_client.go
+++ b/pulsar/transaction_coordinator_client.go
@@ -24,6 +24,8 @@ import (
        "sync/atomic"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/backoff"
+
        "github.com/apache/pulsar-client-go/pulsar/internal"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/apache/pulsar-client-go/pulsar/log"
@@ -143,7 +145,7 @@ func (t *transactionHandler) runEventsLoop() {
 
 func (t *transactionHandler) reconnectToBroker() {
        var delayReconnectTime time.Duration
-       var defaultBackoff = internal.DefaultBackoff{}
+       var defaultBackoff = backoff.DefaultBackoff{}
 
        for {
                if t.getState() == txnHandlerClosed {


Reply via email to