This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 44b8b4e  feat: support limit the retry number of reconnectToBroker 
(#360)
44b8b4e is described below

commit 44b8b4efc18b15d823e316ed0b7d92768dfa7ea7
Author: jony montana <[email protected]>
AuthorDate: Wed Oct 21 10:51:16 2020 +0800

    feat: support limit the retry number of reconnectToBroker (#360)
    
    Signed-off-by: jonyhy96 <[email protected]>
    
    Fixes #257
    
    ### Motivation
    
    Once the connection is closed the reconnectToBroker logic of both producer 
and consumer will try to reconnect to the broker infinatly.
    
    ### Modifications
    
    Add a field in the Options represent the max number of retries and not 
break current behavior if this field is not fullfilled.
---
 pulsar/consumer.go           |  3 +++
 pulsar/consumer_impl.go      |  1 +
 pulsar/consumer_partition.go | 19 +++++++++++++++++--
 pulsar/producer.go           |  3 +++
 pulsar/producer_partition.go | 18 ++++++++++++++++--
 5 files changed, 40 insertions(+), 4 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 41ccb91..e9f803d 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -150,6 +150,9 @@ type ConsumerOptions struct {
 
        // A chain of interceptors, These interceptors will be called at some 
points defined in ConsumerInterceptor interface.
        Interceptors ConsumerInterceptors
+
+       // MaxReconnectToBroker set the maximum retry number of 
reconnectToBroker. (default: ultimate)
+       MaxReconnectToBroker *uint
 }
 
 // Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 5cb1dd2..ed2af4b 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -309,6 +309,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
                                subscriptionMode:           durable,
                                readCompacted:              
c.options.ReadCompacted,
                                interceptors:               
c.options.Interceptors,
+                               maxReconnectToBroker:       
c.options.MaxReconnectToBroker,
                                keySharedPolicy:            
c.options.KeySharedPolicy,
                        }
                        cons, err := newPartitionConsumer(c, c.client, opts, 
c.messageCh, c.dlq)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 8de3f38..2d5ff6b 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -120,6 +120,7 @@ type partitionConsumerOpts struct {
        readCompacted              bool
        disableForceTopicCreation  bool
        interceptors               ConsumerInterceptors
+       maxReconnectToBroker       *uint
        keySharedPolicy            *KeySharedPolicy
 }
 
@@ -815,8 +816,18 @@ func (pc *partitionConsumer) internalClose(req 
*closeRequest) {
 }
 
 func (pc *partitionConsumer) reconnectToBroker() {
-       backoff := internal.Backoff{}
-       for {
+       var (
+               maxRetry int
+               backoff  = internal.Backoff{}
+       )
+
+       if pc.options.maxReconnectToBroker == nil {
+               maxRetry = -1
+       } else {
+               maxRetry = int(*pc.options.maxReconnectToBroker)
+       }
+
+       for maxRetry != 0 {
                if pc.state != consumerReady {
                        // Consumer is already closing
                        return
@@ -832,6 +843,10 @@ func (pc *partitionConsumer) reconnectToBroker() {
                        pc.log.Info("Reconnected consumer to broker")
                        return
                }
+
+               if maxRetry > 0 {
+                       maxRetry--
+               }
        }
 }
 
diff --git a/pulsar/producer.go b/pulsar/producer.go
index cb38e3e..a6cee38 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -138,6 +138,9 @@ type ProducerOptions struct {
 
        // A chain of interceptors, These interceptors will be called at some 
points defined in ProducerInterceptor interface
        Interceptors ProducerInterceptors
+
+       // MaxReconnectToBroker set the maximum retry number of 
reconnectToBroker. (default: ultimate)
+       MaxReconnectToBroker *uint
 }
 
 // Producer is used to publish messages on a topic
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index d245c33..399bcfe 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -243,8 +243,18 @@ func (p *partitionProducer) ConnectionClosed() {
 }
 
 func (p *partitionProducer) reconnectToBroker() {
-       backoff := internal.Backoff{}
-       for {
+       var (
+               maxRetry int
+               backoff  = internal.Backoff{}
+       )
+
+       if p.options.MaxReconnectToBroker == nil {
+               maxRetry = -1
+       } else {
+               maxRetry = int(*p.options.MaxReconnectToBroker)
+       }
+
+       for maxRetry != 0 {
                if atomic.LoadInt32(&p.state) != producerReady {
                        // Producer is already closing
                        return
@@ -260,6 +270,10 @@ func (p *partitionProducer) reconnectToBroker() {
                        p.log.WithField("cnx", p.cnx.ID()).Info("Reconnected 
producer to broker")
                        return
                }
+
+               if maxRetry > 0 {
+                       maxRetry--
+               }
        }
 }
 

Reply via email to