someview opened a new issue, #1147:
URL: https://github.com/apache/pulsar-client-go/issues/1147

   **Is your feature request related to a problem? Please describe.**
   add context control for pub cmd and add async method.
   1. For go, there are many reasons for us to use context to control a request.
   2. go has no keyword like `await`, but sometimes we may want it nonblock, 
eg: we pub a message without need to receive its result. 
   ```
   func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType 
pb.BaseCommand_Type,
        message proto.Message) (*RPCResult, error) {
        var err error
        var host *url.URL
        var rpcResult *RPCResult
        startTime := time.Now()
        backoff := DefaultBackoff{100 * time.Millisecond}
        // we can retry these requests because this kind of request is
        // not specific to any particular broker
        for time.Since(startTime) < c.requestTimeout {
                host, err = c.serviceNameResolver.ResolveHost()
                if err != nil {
                        c.log.WithError(err).Errorf("rpc client failed to 
resolve host")
                        return nil, err
                }
                rpcResult, err = c.Request(host, host, requestID, cmdType, 
message)
                // success we got a response
                if err == nil {
                        break
                }
   
                retryTime := backoff.Next()
                c.log.Debugf("Retrying request in {%v} with timeout in {%v}", 
retryTime, c.requestTimeout)
                time.Sleep(retryTime)
        }
   
        return rpcResult, err
   }
   
   
   func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, 
requestID uint64,
        cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) 
{
        c.metrics.RPCRequestCount.Inc()
        cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
        if err != nil {
                return nil, err
        }
   
        ch := make(chan result, 1)
   
        cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response 
*pb.BaseCommand, err error) {
                ch <- result{&RPCResult{
                        Cnx:      cnx,
                        Response: response,
                }, err}
        })
   
        timeoutCh := time.After(c.requestTimeout)
        for {
                select {
                case res := <-ch:
                        // Ignoring producer not ready response.
                        // Continue to wait for the producer to create 
successfully
                        if res.error == nil && *res.RPCResult.Response.Type == 
pb.BaseCommand_PRODUCER_SUCCESS {
                                if 
!res.RPCResult.Response.ProducerSuccess.GetProducerReady() {
                                        timeoutCh = nil
                                        break
                                }
                        }
                        return res.RPCResult, res.error
                case <-timeoutCh:
                        return nil, ErrRequestTimeOut
                }
        }
   }
   ```
   
   **Describe the solution you'd like**
    the above code is confused.  Retry should seperate from a single request. 
eg:
    ```
   func (c *rpcClient) WithRetry(func()){
   }
   this would avoid the above code that inline Request method to 
RequestToAnyBroker :
   for{
      for{
         select{
             case <-ch1:
             case <- ch2:
        }
      }
   }
    ```
    
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to