This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 77dccea  Handle RPC errors in reconnections (#83)
77dccea is described below

commit 77dccea9cb9f33a0975d7fda42eb44fa60031fb3
Author: Matteo Merli <[email protected]>
AuthorDate: Sat Nov 2 22:47:00 2019 -0700

    Handle RPC errors in reconnections (#83)
    
    ### Motivation
    
    We're not currently handling the ServerError responses in the RpcClient. 
That leaves the caller hanging when any request fails on broker side.
---
 pulsar/impl_partition_consumer.go | 14 ++++++--------
 pulsar/impl_partition_producer.go | 11 +++++------
 pulsar/internal/connection.go     | 39 +++++++++++++++++++++++++++------------
 pulsar/internal/rpc_client.go     | 15 ++++++++++-----
 4 files changed, 48 insertions(+), 31 deletions(-)

diff --git a/pulsar/impl_partition_consumer.go 
b/pulsar/impl_partition_consumer.go
index 5ebbb06..2d8800d 100644
--- a/pulsar/impl_partition_consumer.go
+++ b/pulsar/impl_partition_consumer.go
@@ -693,24 +693,22 @@ func (pc *partitionConsumer) ConnectionClosed() {
 }
 
 func (pc *partitionConsumer) reconnectToBroker() {
-       pc.log.Info("Reconnecting to broker")
-       backoff := new(internal.Backoff)
+       backoff := internal.Backoff{}
        for {
                if pc.state != consumerReady {
                        // Consumer is already closing
                        return
                }
 
+               d := backoff.Next()
+               pc.log.Info("Reconnecting to broker in ", d)
+               time.Sleep(d)
+
                err := pc.grabCnx()
                if err == nil {
                        // Successfully reconnected
-                       pc.log.Info("Successfully reconnected")
+                       pc.log.Info("Reconnected consumer to broker")
                        return
                }
-
-               d := backoff.Next()
-               pc.log.Info("Retrying reconnection after ", d)
-
-               time.Sleep(d)
        }
 }
diff --git a/pulsar/impl_partition_producer.go 
b/pulsar/impl_partition_producer.go
index 196dbbc..56fbb6c 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -174,7 +174,6 @@ func (p *partitionProducer) ConnectionClosed() {
 }
 
 func (p *partitionProducer) reconnectToBroker() {
-       p.log.Info("Reconnecting to broker")
        backoff := internal.Backoff{}
        for {
                if p.state != producerReady {
@@ -182,16 +181,16 @@ func (p *partitionProducer) reconnectToBroker() {
                        return
                }
 
+               d := backoff.Next()
+               p.log.Info("Reconnecting to broker in ", d)
+               time.Sleep(d)
+
                err := p.grabCnx()
                if err == nil {
                        // Successfully reconnected
+                       p.log.Info("Reconnected producer to broker")
                        return
                }
-
-               d := backoff.Next()
-               p.log.Info("Retrying reconnection after ", d)
-
-               time.Sleep(d)
        }
 }
 
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 592beca..2e933b8 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -21,6 +21,7 @@ import (
        "crypto/tls"
        "crypto/x509"
        "errors"
+       "fmt"
        "io/ioutil"
        "net"
        "net/url"
@@ -55,7 +56,7 @@ type ConnectionListener interface {
 
 // Connection is a interface of client cnx.
 type Connection interface {
-       SendRequest(requestID uint64, req *pb.BaseCommand, callback 
func(command *pb.BaseCommand))
+       SendRequest(requestID uint64, req *pb.BaseCommand, callback 
func(*pb.BaseCommand, error))
        WriteData(data []byte)
        RegisterListener(id uint64, listener ConnectionListener)
        UnregisterListener(id uint64)
@@ -103,7 +104,7 @@ const keepAliveInterval = 30 * time.Second
 type request struct {
        id       uint64
        cmd      *pb.BaseCommand
-       callback func(command *pb.BaseCommand)
+       callback func(command *pb.BaseCommand, err error)
 }
 
 type connection struct {
@@ -116,8 +117,8 @@ type connection struct {
        cnx          net.Conn
 
        writeBufferLock sync.Mutex
-       writeBuffer          Buffer
-       reader               *connectionReader
+       writeBuffer     Buffer
+       reader          *connectionReader
 
        lastDataReceivedLock sync.Mutex
        lastDataReceivedTime time.Time
@@ -135,7 +136,7 @@ type connection struct {
        listeners   map[uint64]ConnectionListener
 
        consumerHandlersLock sync.RWMutex
-       consumerHandlers map[uint64]ConsumerHandler
+       consumerHandlers     map[uint64]ConsumerHandler
 
        tlsOptions *TLSOptions
        auth       auth.Provider
@@ -362,11 +363,8 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, 
headersAndPayload []by
                c.handleResponse(cmd.GetSchemaResponse.GetRequestId(), cmd)
 
        case pb.BaseCommand_ERROR:
-               if cmd.Error != nil {
-                       c.log.Errorf("Error: %s, Error Message: %s", 
cmd.Error.GetError(), cmd.Error.GetMessage())
-                       c.Close()
-                       return
-               }
+               c.handleResponseError(cmd.GetError())
+
        case pb.BaseCommand_CLOSE_PRODUCER:
                c.handleCloseProducer(cmd.GetCloseProducer())
        case pb.BaseCommand_CLOSE_CONSUMER:
@@ -399,7 +397,7 @@ func (c *connection) Write(data []byte) {
        c.writeRequestsCh <- data
 }
 
-func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, 
callback func(command *pb.BaseCommand)) {
+func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, 
callback func(command *pb.BaseCommand, err error)) {
        c.incomingRequestsCh <- &request{
                id:       requestID,
                cmd:      req,
@@ -424,7 +422,24 @@ func (c *connection) handleResponse(requestID uint64, 
response *pb.BaseCommand)
 
        delete(c.pendingReqs, requestID)
        c.mapMutex.RUnlock()
-       request.callback(response)
+       request.callback(response, nil)
+}
+
+func (c *connection) handleResponseError(serverError *pb.CommandError) {
+       requestID := serverError.GetRequestId()
+       c.mapMutex.RLock()
+       request, ok := c.pendingReqs[requestID]
+       if !ok {
+               c.log.Warnf("Received unexpected error response for request %d 
of type %s",
+                       requestID, serverError.GetError())
+               return
+       }
+
+       delete(c.pendingReqs, requestID)
+       c.mapMutex.RUnlock()
+
+       request.callback(nil,
+               errors.New(fmt.Sprintf("server error: %s: %s", 
serverError.GetError(), serverError.GetMessage())))
 }
 
 func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 66e8a79..35d8b61 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -84,14 +84,17 @@ func (c *rpcClient) Request(logicalAddr *url.URL, 
physicalAddr *url.URL, request
                Cnx: cnx,
        }
 
+       var rpcErr error = nil
+
        // TODO: Handle errors with disconnections
-       cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response 
*pb.BaseCommand) {
+       cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response 
*pb.BaseCommand, err error) {
                rpcResult.Response = response
+               rpcErr = err
                wg.Done()
        })
 
        wg.Wait()
-       return rpcResult, nil
+       return rpcResult, rpcErr
 }
 
 func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType 
pb.BaseCommand_Type,
@@ -103,13 +106,15 @@ func (c *rpcClient) RequestOnCnx(cnx Connection, 
requestID uint64, cmdType pb.Ba
                Cnx: cnx,
        }
 
-       cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response 
*pb.BaseCommand) {
+       var rpcErr error = nil
+       cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response 
*pb.BaseCommand, err error) {
                rpcResult.Response = response
+               rpcErr = err
                wg.Done()
        })
 
        wg.Wait()
-       return rpcResult, nil
+       return rpcResult, rpcErr
 }
 
 func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, requestID uint64, 
cmdType pb.BaseCommand_Type,
@@ -118,7 +123,7 @@ func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, 
requestID uint64, cmdType
                Cnx: cnx,
        }
 
-       cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response 
*pb.BaseCommand) {
+       cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response 
*pb.BaseCommand, err error) {
                rpcResult.Response = response
        })
 

Reply via email to