This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new b354cb8  Implemented the operation timeout check (#139)
b354cb8 is described below

commit b354cb8ae84ab1b25cb293e29ba9cc449b224c69
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Dec 23 09:49:50 2019 -0800

    Implemented the operation timeout check (#139)
---
 pulsar/client_impl.go         |  8 +++++++-
 pulsar/internal/rpc_client.go | 36 ++++++++++++++++++++----------------
 2 files changed, 27 insertions(+), 17 deletions(-)

diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index c422d4a..7873b56 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -34,6 +34,7 @@ import (
 
 const (
        defaultConnectionTimeout = 30*time.Second
+       defaultOperationTimeout = 30*time.Second
 )
 
 type client struct {
@@ -91,10 +92,15 @@ func newClient(options ClientOptions) (Client, error) {
                connectionTimeout = defaultConnectionTimeout
        }
 
+       operationTimeout := options.OperationTimeout
+       if operationTimeout.Nanoseconds() == 0 {
+               operationTimeout = defaultOperationTimeout
+       }
+
        c := &client{
                cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, 
connectionTimeout),
        }
-       c.rpcClient = internal.NewRPCClient(url, c.cnxPool)
+       c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout)
        c.lookupService = internal.NewLookupService(c.rpcClient, url)
        c.handlers = internal.NewClientHandlers()
        return c, nil
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 860681d..aa89b59 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -18,9 +18,11 @@
 package internal
 
 import (
+       "errors"
        "net/url"
        "sync"
        "sync/atomic"
+       "time"
 
        "github.com/apache/pulsar-client-go/pulsar/internal/pb"
        "github.com/golang/protobuf/proto"
@@ -53,15 +55,17 @@ type RPCClient interface {
 type rpcClient struct {
        serviceURL          *url.URL
        pool                ConnectionPool
+       requestTimeout      time.Duration
        requestIDGenerator  uint64
        producerIDGenerator uint64
        consumerIDGenerator uint64
 }
 
-func NewRPCClient(serviceURL *url.URL, pool ConnectionPool) RPCClient {
+func NewRPCClient(serviceURL *url.URL, pool ConnectionPool, requestTimeout 
time.Duration) RPCClient {
        return &rpcClient{
-               serviceURL: serviceURL,
-               pool:       pool,
+               serviceURL:     serviceURL,
+               pool:           pool,
+               requestTimeout: requestTimeout,
        }
 }
 
@@ -77,24 +81,24 @@ func (c *rpcClient) Request(logicalAddr *url.URL, 
physicalAddr *url.URL, request
                return nil, err
        }
 
-       wg := sync.WaitGroup{}
-       wg.Add(1)
-
-       rpcResult := &RPCResult{
-               Cnx: cnx,
-       }
-
-       var rpcErr error = nil
+       type Res struct {*RPCResult; error}
+       ch := make(chan Res)
 
        // TODO: Handle errors with disconnections
        cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response 
*pb.BaseCommand, err error) {
-               rpcResult.Response = response
-               rpcErr = err
-               wg.Done()
+               ch <- Res{&RPCResult{
+                       Cnx:      cnx,
+                       Response: response,
+               }, err}
+               close(ch)
        })
 
-       wg.Wait()
-       return rpcResult, rpcErr
+       select {
+       case res := <-ch:
+               return res.RPCResult, res.error
+       case <-time.After(c.requestTimeout):
+               return nil, errors.New("request timed out")
+       }
 }
 
 func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType 
pb.BaseCommand_Type,

Reply via email to