This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 86c3e80 Add retry logic when get connection error (#157)
86c3e80 is described below
commit 86c3e808d0e6521757a54a6908e24d42fc099734
Author: 冉小龙 <[email protected]>
AuthorDate: Tue Jan 7 01:19:07 2020 +0800
Add retry logic when get connection error (#157)
* [issue:144] Add retry logic when get connection error
Signed-off-by: xiaolong.ran <[email protected]>
* replace fmt with log
Signed-off-by: xiaolong.ran <[email protected]>
* code formate
Signed-off-by: xiaolong.ran <[email protected]>
* add coverage.html to .gitignore
Signed-off-by: xiaolong.ran <[email protected]>
* fix comments
Signed-off-by: xiaolong.ran <[email protected]>
* code format
Signed-off-by: xiaolong.ran <[email protected]>
---
.gitignore | 1 +
pulsar/internal/connection_reader.go | 2 +-
pulsar/internal/rpc_client.go | 30 +++++++++++++++++++++++++++---
3 files changed, 29 insertions(+), 4 deletions(-)
diff --git a/.gitignore b/.gitignore
index bc14ef8..3db3c8c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,6 +9,7 @@
# Output of the go coverage tool
*.out
+coverage.html
perf/perf
pulsar-perf
diff --git a/pulsar/internal/connection_reader.go
b/pulsar/internal/connection_reader.go
index 6cbf650..0485774 100644
--- a/pulsar/internal/connection_reader.go
+++ b/pulsar/internal/connection_reader.go
@@ -50,7 +50,7 @@ func (r *connectionReader) readFromConnection() {
}
// Process
- var payloadLen uint32 = 0
+ var payloadLen uint32
if headersAndPayload != nil {
payloadLen = headersAndPayload.ReadableBytes()
}
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 23c3b61..dda701f 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -26,6 +26,8 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
"github.com/golang/protobuf/proto"
+
+ log "github.com/sirupsen/logrus"
)
type RPCResult struct {
@@ -59,6 +61,8 @@ type rpcClient struct {
requestIDGenerator uint64
producerIDGenerator uint64
consumerIDGenerator uint64
+
+ log *log.Entry
}
func NewRPCClient(serviceURL *url.URL, pool ConnectionPool, requestTimeout
time.Duration) RPCClient {
@@ -66,6 +70,7 @@ func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
requestTimeout time.
serviceURL: serviceURL,
pool: pool,
requestTimeout: requestTimeout,
+ log: log.WithField("serviceURL", serviceURL),
}
}
@@ -76,8 +81,7 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64,
cmdType pb.BaseCommand_
func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL,
requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
{
- // TODO: Add retry logic in case of connection issues
- cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
+ cnx, err := c.getConn(logicalAddr, physicalAddr)
if err != nil {
return nil, err
}
@@ -88,7 +92,7 @@ func (c *rpcClient) Request(logicalAddr *url.URL,
physicalAddr *url.URL, request
}
ch := make(chan Res)
- // TODO: Handle errors with disconnections
+ // TODO: in here, the error of callback always nil
cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response
*pb.BaseCommand, err error) {
ch <- Res{&RPCResult{
Cnx: cnx,
@@ -105,6 +109,26 @@ func (c *rpcClient) Request(logicalAddr *url.URL,
physicalAddr *url.URL, request
}
}
+func (c *rpcClient) getConn(logicalAddr *url.URL, physicalAddr *url.URL)
(Connection, error) {
+ cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
+ backoff := new(Backoff)
+ var retryTime time.Duration
+ if err != nil {
+ for retryTime < c.requestTimeout {
+ retryTime = backoff.Next()
+ c.log.Debugf("Reconnecting to broker in {%v}",
retryTime)
+ time.Sleep(retryTime)
+ cnx, err = c.pool.GetConnection(logicalAddr,
physicalAddr)
+ if err == nil {
+ c.log.Debugf("retry connection success")
+ return cnx, nil
+ }
+ }
+ return nil, err
+ }
+ return cnx, nil
+}
+
func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType
pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
wg := sync.WaitGroup{}