This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 86c3e80  Add retry logic when get connection error (#157)
86c3e80 is described below

commit 86c3e808d0e6521757a54a6908e24d42fc099734
Author: 冉小龙 <[email protected]>
AuthorDate: Tue Jan 7 01:19:07 2020 +0800

    Add retry logic when get connection error (#157)
    
    * [issue:144] Add retry logic when get connection error
    
    Signed-off-by: xiaolong.ran <[email protected]>
    
    * replace fmt with log
    
    Signed-off-by: xiaolong.ran <[email protected]>
    
    * code formate
    
    Signed-off-by: xiaolong.ran <[email protected]>
    
    * add coverage.html to .gitignore
    
    Signed-off-by: xiaolong.ran <[email protected]>
    
    * fix comments
    
    Signed-off-by: xiaolong.ran <[email protected]>
    
    * code format
    
    Signed-off-by: xiaolong.ran <[email protected]>
---
 .gitignore                           |  1 +
 pulsar/internal/connection_reader.go |  2 +-
 pulsar/internal/rpc_client.go        | 30 +++++++++++++++++++++++++++---
 3 files changed, 29 insertions(+), 4 deletions(-)

diff --git a/.gitignore b/.gitignore
index bc14ef8..3db3c8c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,6 +9,7 @@
 
 # Output of the go coverage tool
 *.out
+coverage.html
 
 perf/perf
 pulsar-perf
diff --git a/pulsar/internal/connection_reader.go 
b/pulsar/internal/connection_reader.go
index 6cbf650..0485774 100644
--- a/pulsar/internal/connection_reader.go
+++ b/pulsar/internal/connection_reader.go
@@ -50,7 +50,7 @@ func (r *connectionReader) readFromConnection() {
                }
 
                // Process
-               var payloadLen uint32 = 0
+               var payloadLen uint32
                if headersAndPayload != nil {
                        payloadLen = headersAndPayload.ReadableBytes()
                }
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 23c3b61..dda701f 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -26,6 +26,8 @@ import (
 
        "github.com/apache/pulsar-client-go/pulsar/internal/pb"
        "github.com/golang/protobuf/proto"
+
+       log "github.com/sirupsen/logrus"
 )
 
 type RPCResult struct {
@@ -59,6 +61,8 @@ type rpcClient struct {
        requestIDGenerator  uint64
        producerIDGenerator uint64
        consumerIDGenerator uint64
+
+       log *log.Entry
 }
 
 func NewRPCClient(serviceURL *url.URL, pool ConnectionPool, requestTimeout 
time.Duration) RPCClient {
@@ -66,6 +70,7 @@ func NewRPCClient(serviceURL *url.URL, pool ConnectionPool, 
requestTimeout time.
                serviceURL:     serviceURL,
                pool:           pool,
                requestTimeout: requestTimeout,
+               log:            log.WithField("serviceURL", serviceURL),
        }
 }
 
@@ -76,8 +81,7 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, 
cmdType pb.BaseCommand_
 
 func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, 
requestID uint64,
        cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) 
{
-       // TODO: Add retry logic in case of connection issues
-       cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
+       cnx, err := c.getConn(logicalAddr, physicalAddr)
        if err != nil {
                return nil, err
        }
@@ -88,7 +92,7 @@ func (c *rpcClient) Request(logicalAddr *url.URL, 
physicalAddr *url.URL, request
        }
        ch := make(chan Res)
 
-       // TODO: Handle errors with disconnections
+       // TODO: in here, the error of callback always nil
        cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response 
*pb.BaseCommand, err error) {
                ch <- Res{&RPCResult{
                        Cnx:      cnx,
@@ -105,6 +109,26 @@ func (c *rpcClient) Request(logicalAddr *url.URL, 
physicalAddr *url.URL, request
        }
 }
 
+func (c *rpcClient) getConn(logicalAddr *url.URL, physicalAddr *url.URL) 
(Connection, error) {
+       cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
+       backoff := new(Backoff)
+       var retryTime time.Duration
+       if err != nil {
+               for retryTime < c.requestTimeout {
+                       retryTime = backoff.Next()
+                       c.log.Debugf("Reconnecting to broker in {%v}", 
retryTime)
+                       time.Sleep(retryTime)
+                       cnx, err = c.pool.GetConnection(logicalAddr, 
physicalAddr)
+                       if err == nil {
+                               c.log.Debugf("retry connection success")
+                               return cnx, nil
+                       }
+               }
+               return nil, err
+       }
+       return cnx, nil
+}
+
 func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType 
pb.BaseCommand_Type,
        message proto.Message) (*RPCResult, error) {
        wg := sync.WaitGroup{}

Reply via email to