This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new fafa8c3  Fix reconnection logic when topic is deleted (#627)
fafa8c3 is described below

commit fafa8c37407ba2f3fb3066ec9b60124b8bb16847
Author: xiaolong ran <[email protected]>
AuthorDate: Wed Sep 29 17:47:17 2021 +0800

    Fix reconnection logic when topic is deleted (#627)
    
    Signed-off-by: xiaolongran <[email protected]>
    
    
    Fixes #623
    
    
    ### Motivation
    
    As #623 said, when the topic is deleted forced, we don't should trying to 
reconnect, instead of giving up reconnection.
    
    
    ### Modifications
    
    - Fix prodcuer reconnetion logic
    - Fix consumer reconnection logic
---
 pulsar/consumer_partition.go | 7 +++++++
 pulsar/producer_partition.go | 9 +++++++++
 2 files changed, 16 insertions(+)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index cf92949..5f74bcf 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -20,6 +20,7 @@ package pulsar
 import (
        "fmt"
        "math"
+       "strings"
        "sync"
        "time"
 
@@ -893,6 +894,12 @@ func (pc *partitionConsumer) reconnectToBroker() {
                        pc.log.Info("Reconnected consumer to broker")
                        return
                }
+               errMsg := err.Error()
+               if strings.Contains(errMsg, errTopicNotFount) {
+                       // when topic is deleted, we should give up 
reconnection.
+                       pc.log.Warn("Topic Not Found.")
+                       break
+               }
 
                if maxRetry > 0 {
                        maxRetry--
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 4ae4e00..e273021 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -20,6 +20,7 @@ package pulsar
 import (
        "context"
        "fmt"
+       "strings"
        "sync"
        "sync/atomic"
        "time"
@@ -58,6 +59,8 @@ var (
        buffersPool sync.Pool
 )
 
+var errTopicNotFount = "TopicNotFound"
+
 type partitionProducer struct {
        state  ua.Int32
        client *client
@@ -350,6 +353,12 @@ func (p *partitionProducer) reconnectToBroker() {
                        p.log.WithField("cnx", p.cnx.ID()).Info("Reconnected 
producer to broker")
                        return
                }
+               errMsg := err.Error()
+               if strings.Contains(errMsg, errTopicNotFount) {
+                       // when topic is deleted, we should give up 
reconnection.
+                       p.log.Warn("Topic Not Found.")
+                       break
+               }
 
                if maxRetry > 0 {
                        maxRetry--

Reply via email to