cckellogg commented on a change in pull request #559:
URL: https://github.com/apache/pulsar-client-go/pull/559#discussion_r661916928



##########
File path: pulsar/internal/rpc_client.go
##########
@@ -88,37 +87,30 @@ func NewRPCClient(serviceURL *url.URL, serviceNameResolver 
ServiceNameResolver,
 
 func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType 
pb.BaseCommand_Type,
        message proto.Message) (*RPCResult, error) {
-       host, err := c.serviceNameResolver.ResolveHost()
-       if err != nil {
-               c.log.Errorf("request host resolve failed with error: {%v}", 
err)
-               return nil, err
-       }
-       rpcResult, err := c.Request(host, host, requestID, cmdType, message)
-       if _, ok := err.(net.Error); ok || (err != nil && err.Error() == 
"connection error") {
-               // We can retry this kind of requests over a connection error 
because they're
-               // not specific to a particular broker.
-               backoff := Backoff{100 * time.Millisecond}
-               startTime := time.Now()
-               var retryTime time.Duration
-
-               for time.Since(startTime) < c.requestTimeout {
-                       retryTime = backoff.Next()
-                       c.log.Debugf("Retrying request in {%v} with timeout in 
{%v}", retryTime, c.requestTimeout)
-                       time.Sleep(retryTime)
-                       host, err = c.serviceNameResolver.ResolveHost()
-                       if err != nil {
-                               c.log.Errorf("Retrying request host resolve 
failed with error: {%v}", err)
-                               continue
-                       }
-                       rpcResult, err = c.Request(host, host, requestID, 
cmdType, message)
-                       if _, ok := err.(net.Error); ok || (err != nil && 
err.Error() == "connection error") {
-                               continue
-                       } else {
-                               // We either succeeded or encountered a non 
connection error
-                               break
-                       }
+       var err error
+       var host *url.URL
+       var rpcResult *RPCResult
+       startTime := time.Now()
+       backoff := Backoff{100 * time.Millisecond}
+       // we can retry these requests because this kind of request is
+       // not specific to any particular broker
+       for time.Since(startTime) < c.requestTimeout {
+               host, err = c.serviceNameResolver.ResolveHost()
+               if err != nil {
+                       c.log.WithError(err).Errorf("rpc client failed to 
resolve host")
+                       return nil, err
                }
+               rpcResult, err = c.Request(host, host, requestID, cmdType, 
message)
+               // success we got a response
+               if err == nil {
+                       break
+               }
+
+               retryTime := backoff.Next()
+               c.log.Debugf("Retrying request in {%v} with timeout in {%v}", 
retryTime, c.requestTimeout)
+               time.Sleep(retryTime)
        }
+

Review comment:
       We could also do something like
   ```
           var err error
        var host *url.URL
        var rpcResult *RPCResult
        backoff := Backoff{100 * time.Millisecond}
        var timeoutCh <-chan time.Time
        for {
                host, err = c.serviceNameResolver.ResolveHost()
                if err != nil {
                        c.log.WithError(err).Errorf("rpc client failed to 
resolve host")
                        return nil, err
                }
                rpcResult, err = c.Request(host, host, requestID, cmdType, 
message)
                // success we got a response
                if err == nil {
                        break
                }
   
                retryTime := backoff.Next()
                sleepCh := time.After(retryTime)
                if timeoutCh == nil {
                        timeoutCh = time.After(c.requestTimeout)
                }
                c.log.Debugf("Retrying request in {%v} with timeout in {%v}", 
retryTime, c.requestTimeout)
                select {
                case <- sleepCh:
                        break
                case <- timeoutCh:
                        return rpcResult, err
                }
        }
   ```
   
   And we may want to think about adding a context (in another PR) if can we 
need to cancel this when shutting down producer or consumer on reconnect?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to