cckellogg commented on a change in pull request #559:
URL: https://github.com/apache/pulsar-client-go/pull/559#discussion_r661916928
##########
File path: pulsar/internal/rpc_client.go
##########
@@ -88,37 +87,30 @@ func NewRPCClient(serviceURL *url.URL, serviceNameResolver
ServiceNameResolver,
func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType
pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
- host, err := c.serviceNameResolver.ResolveHost()
- if err != nil {
- c.log.Errorf("request host resolve failed with error: {%v}",
err)
- return nil, err
- }
- rpcResult, err := c.Request(host, host, requestID, cmdType, message)
- if _, ok := err.(net.Error); ok || (err != nil && err.Error() ==
"connection error") {
- // We can retry this kind of requests over a connection error
because they're
- // not specific to a particular broker.
- backoff := Backoff{100 * time.Millisecond}
- startTime := time.Now()
- var retryTime time.Duration
-
- for time.Since(startTime) < c.requestTimeout {
- retryTime = backoff.Next()
- c.log.Debugf("Retrying request in {%v} with timeout in
{%v}", retryTime, c.requestTimeout)
- time.Sleep(retryTime)
- host, err = c.serviceNameResolver.ResolveHost()
- if err != nil {
- c.log.Errorf("Retrying request host resolve
failed with error: {%v}", err)
- continue
- }
- rpcResult, err = c.Request(host, host, requestID,
cmdType, message)
- if _, ok := err.(net.Error); ok || (err != nil &&
err.Error() == "connection error") {
- continue
- } else {
- // We either succeeded or encountered a non
connection error
- break
- }
+ var err error
+ var host *url.URL
+ var rpcResult *RPCResult
+ startTime := time.Now()
+ backoff := Backoff{100 * time.Millisecond}
+ // we can retry these requests because this kind of request is
+ // not specific to any particular broker
+ for time.Since(startTime) < c.requestTimeout {
+ host, err = c.serviceNameResolver.ResolveHost()
+ if err != nil {
+ c.log.WithError(err).Errorf("rpc client failed to
resolve host")
+ return nil, err
}
+ rpcResult, err = c.Request(host, host, requestID, cmdType,
message)
+ // success we got a response
+ if err == nil {
+ break
+ }
+
+ retryTime := backoff.Next()
+ c.log.Debugf("Retrying request in {%v} with timeout in {%v}",
retryTime, c.requestTimeout)
+ time.Sleep(retryTime)
}
+
Review comment:
We could also do something like
```
var err error
var host *url.URL
var rpcResult *RPCResult
backoff := Backoff{100 * time.Millisecond}
var timeoutCh <-chan time.Time
for {
host, err = c.serviceNameResolver.ResolveHost()
if err != nil {
c.log.WithError(err).Errorf("rpc client failed to
resolve host")
return nil, err
}
rpcResult, err = c.Request(host, host, requestID, cmdType,
message)
// success we got a response
if err == nil {
break
}
retryTime := backoff.Next()
sleepCh := time.After(retryTime)
if timeoutCh == nil {
timeoutCh = time.After(c.requestTimeout)
}
c.log.Debugf("Retrying request in {%v} with timeout in {%v}",
retryTime, c.requestTimeout)
select {
case <- sleepCh:
break
case <- timeoutCh:
return rpcResult, err
}
}
```
And we may want to think about adding a context (in another PR) if can we
need to cancel this when shutting down producer or consumer on reconnect?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]