This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new cfe87b3  Use the specified connection timeout when creating 
connections (#137)
cfe87b3 is described below

commit cfe87b3a03d3b1dce761c02ddc8776e9def222ee
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Dec 19 18:45:05 2019 -0800

    Use the specified connection timeout when creating connections (#137)
    
    * Use the specified connection timeout when creating connections
    
    * Fixed compile and other errors
---
 pulsar/client.go                   |  1 +
 pulsar/client_impl.go              | 12 +++++++++++-
 pulsar/internal/connection.go      | 14 +++++++++-----
 pulsar/internal/connection_pool.go | 17 ++++++++++-------
 4 files changed, 31 insertions(+), 13 deletions(-)

diff --git a/pulsar/client.go b/pulsar/client.go
index 31257ed..5a98496 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -63,6 +63,7 @@ type ClientOptions struct {
        // This parameter is required
        URL string
 
+       // Timeout for the establishment of a TCP connection (default: 30 
seconds)
        ConnectionTimeout time.Duration
 
        // Set the operation timeout (default: 30 seconds)
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 6e8a910..c422d4a 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -21,6 +21,7 @@ import (
        "errors"
        "fmt"
        "net/url"
+       "time"
 
        "github.com/golang/protobuf/proto"
 
@@ -31,6 +32,10 @@ import (
        "github.com/apache/pulsar-client-go/pulsar/internal/pb"
 )
 
+const (
+       defaultConnectionTimeout = 30*time.Second
+)
+
 type client struct {
        options ClientOptions
 
@@ -81,8 +86,13 @@ func newClient(options ClientOptions) (Client, error) {
                }
        }
 
+       connectionTimeout := options.ConnectionTimeout
+       if connectionTimeout.Nanoseconds() == 0 {
+               connectionTimeout = defaultConnectionTimeout
+       }
+
        c := &client{
-               cnxPool: internal.NewConnectionPool(tlsConfig, authProvider),
+               cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, 
connectionTimeout),
        }
        c.rpcClient = internal.NewRPCClient(url, c.cnxPool)
        c.lookupService = internal.NewLookupService(c.rpcClient, url)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index e18840c..a42be14 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -115,8 +115,9 @@ type incomingCmd struct {
 
 type connection struct {
        sync.Mutex
-       cond  *sync.Cond
-       state connectionState
+       cond              *sync.Cond
+       state             connectionState
+       connectionTimeout time.Duration
 
        logicalAddr  *url.URL
        physicalAddr *url.URL
@@ -150,9 +151,11 @@ type connection struct {
        auth       auth.Provider
 }
 
-func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions 
*TLSOptions, auth auth.Provider) *connection {
+func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions 
*TLSOptions,
+       connectionTimeout time.Duration, auth auth.Provider) *connection {
        cnx := &connection{
                state:                connectionInit,
+               connectionTimeout:    connectionTimeout,
                logicalAddr:          logicalAddr,
                physicalAddr:         physicalAddr,
                writeBuffer:          NewBuffer(4096),
@@ -202,7 +205,7 @@ func (c *connection) connect() bool {
 
        if c.tlsOptions == nil {
                // Clear text connection
-               cnx, err = net.Dial("tcp", c.physicalAddr.Host)
+               cnx, err = net.DialTimeout("tcp", c.physicalAddr.Host, 
c.connectionTimeout)
        } else {
                // TLS connection
                tlsConfig, err = c.getTLSConfig()
@@ -211,7 +214,8 @@ func (c *connection) connect() bool {
                        return false
                }
 
-               cnx, err = tls.Dial("tcp", c.physicalAddr.Host, tlsConfig)
+               d := &net.Dialer{Timeout: c.connectionTimeout}
+               cnx, err = tls.DialWithDialer(d, "tcp", c.physicalAddr.Host, 
tlsConfig)
        }
 
        if err != nil {
diff --git a/pulsar/internal/connection_pool.go 
b/pulsar/internal/connection_pool.go
index e07b23b..14f7753 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -20,6 +20,7 @@ package internal
 import (
        "net/url"
        "sync"
+       "time"
 
        "github.com/apache/pulsar-client-go/pulsar/internal/auth"
 
@@ -36,16 +37,18 @@ type ConnectionPool interface {
 }
 
 type connectionPool struct {
-       pool       sync.Map
-       tlsOptions *TLSOptions
-       auth       auth.Provider
+       pool              sync.Map
+       connectionTimeout time.Duration
+       tlsOptions        *TLSOptions
+       auth              auth.Provider
 }
 
 // NewConnectionPool init connection pool.
-func NewConnectionPool(tlsOptions *TLSOptions, auth auth.Provider) 
ConnectionPool {
+func NewConnectionPool(tlsOptions *TLSOptions, auth auth.Provider, 
connectionTimeout time.Duration) ConnectionPool {
        return &connectionPool{
-               tlsOptions: tlsOptions,
-               auth:       auth,
+               tlsOptions:        tlsOptions,
+               auth:              auth,
+               connectionTimeout: connectionTimeout,
        }
 }
 
@@ -66,7 +69,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, 
physicalAddr *url.U
 
        // Try to create a new connection
        newCnx, wasCached := p.pool.LoadOrStore(logicalAddr.Host,
-               newConnection(logicalAddr, physicalAddr, p.tlsOptions, p.auth))
+               newConnection(logicalAddr, physicalAddr, p.tlsOptions, 
p.connectionTimeout, p.auth))
        cnx := newCnx.(*connection)
        if !wasCached {
                cnx.start()

Reply via email to