someview opened a new issue, #1147:
URL: https://github.com/apache/pulsar-client-go/issues/1147
**Is your feature request related to a problem? Please describe.**
add context control for pub cmd and add async method.
1. For go, there are many reasons for us to use context to control a request.
2. go has no keyword like `await`, but sometimes we may want it nonblock,
eg: we pub a message without need to receive its result.
```
func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType
pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
var err error
var host *url.URL
var rpcResult *RPCResult
startTime := time.Now()
backoff := DefaultBackoff{100 * time.Millisecond}
// we can retry these requests because this kind of request is
// not specific to any particular broker
for time.Since(startTime) < c.requestTimeout {
host, err = c.serviceNameResolver.ResolveHost()
if err != nil {
c.log.WithError(err).Errorf("rpc client failed to
resolve host")
return nil, err
}
rpcResult, err = c.Request(host, host, requestID, cmdType,
message)
// success we got a response
if err == nil {
break
}
retryTime := backoff.Next()
c.log.Debugf("Retrying request in {%v} with timeout in {%v}",
retryTime, c.requestTimeout)
time.Sleep(retryTime)
}
return rpcResult, err
}
func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL,
requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
{
c.metrics.RPCRequestCount.Inc()
cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
if err != nil {
return nil, err
}
ch := make(chan result, 1)
cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response
*pb.BaseCommand, err error) {
ch <- result{&RPCResult{
Cnx: cnx,
Response: response,
}, err}
})
timeoutCh := time.After(c.requestTimeout)
for {
select {
case res := <-ch:
// Ignoring producer not ready response.
// Continue to wait for the producer to create
successfully
if res.error == nil && *res.RPCResult.Response.Type ==
pb.BaseCommand_PRODUCER_SUCCESS {
if
!res.RPCResult.Response.ProducerSuccess.GetProducerReady() {
timeoutCh = nil
break
}
}
return res.RPCResult, res.error
case <-timeoutCh:
return nil, ErrRequestTimeOut
}
}
}
```
**Describe the solution you'd like**
the above code is confused. Retry should seperate from a single request.
eg:
```
func (c *rpcClient) WithRetry(func()){
}
this would avoid the above code that inline Request method to
RequestToAnyBroker :
for{
for{
select{
case <-ch1:
case <- ch2:
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]