This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new cfe87b3 Use the specified connection timeout when creating
connections (#137)
cfe87b3 is described below
commit cfe87b3a03d3b1dce761c02ddc8776e9def222ee
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Dec 19 18:45:05 2019 -0800
Use the specified connection timeout when creating connections (#137)
* Use the specified connection timeout when creating connections
* Fixed compile and other errors
---
pulsar/client.go | 1 +
pulsar/client_impl.go | 12 +++++++++++-
pulsar/internal/connection.go | 14 +++++++++-----
pulsar/internal/connection_pool.go | 17 ++++++++++-------
4 files changed, 31 insertions(+), 13 deletions(-)
diff --git a/pulsar/client.go b/pulsar/client.go
index 31257ed..5a98496 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -63,6 +63,7 @@ type ClientOptions struct {
// This parameter is required
URL string
+ // Timeout for the establishment of a TCP connection (default: 30
seconds)
ConnectionTimeout time.Duration
// Set the operation timeout (default: 30 seconds)
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 6e8a910..c422d4a 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"net/url"
+ "time"
"github.com/golang/protobuf/proto"
@@ -31,6 +32,10 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
)
+const (
+ defaultConnectionTimeout = 30*time.Second
+)
+
type client struct {
options ClientOptions
@@ -81,8 +86,13 @@ func newClient(options ClientOptions) (Client, error) {
}
}
+ connectionTimeout := options.ConnectionTimeout
+ if connectionTimeout.Nanoseconds() == 0 {
+ connectionTimeout = defaultConnectionTimeout
+ }
+
c := &client{
- cnxPool: internal.NewConnectionPool(tlsConfig, authProvider),
+ cnxPool: internal.NewConnectionPool(tlsConfig, authProvider,
connectionTimeout),
}
c.rpcClient = internal.NewRPCClient(url, c.cnxPool)
c.lookupService = internal.NewLookupService(c.rpcClient, url)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index e18840c..a42be14 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -115,8 +115,9 @@ type incomingCmd struct {
type connection struct {
sync.Mutex
- cond *sync.Cond
- state connectionState
+ cond *sync.Cond
+ state connectionState
+ connectionTimeout time.Duration
logicalAddr *url.URL
physicalAddr *url.URL
@@ -150,9 +151,11 @@ type connection struct {
auth auth.Provider
}
-func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions
*TLSOptions, auth auth.Provider) *connection {
+func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions
*TLSOptions,
+ connectionTimeout time.Duration, auth auth.Provider) *connection {
cnx := &connection{
state: connectionInit,
+ connectionTimeout: connectionTimeout,
logicalAddr: logicalAddr,
physicalAddr: physicalAddr,
writeBuffer: NewBuffer(4096),
@@ -202,7 +205,7 @@ func (c *connection) connect() bool {
if c.tlsOptions == nil {
// Clear text connection
- cnx, err = net.Dial("tcp", c.physicalAddr.Host)
+ cnx, err = net.DialTimeout("tcp", c.physicalAddr.Host,
c.connectionTimeout)
} else {
// TLS connection
tlsConfig, err = c.getTLSConfig()
@@ -211,7 +214,8 @@ func (c *connection) connect() bool {
return false
}
- cnx, err = tls.Dial("tcp", c.physicalAddr.Host, tlsConfig)
+ d := &net.Dialer{Timeout: c.connectionTimeout}
+ cnx, err = tls.DialWithDialer(d, "tcp", c.physicalAddr.Host,
tlsConfig)
}
if err != nil {
diff --git a/pulsar/internal/connection_pool.go
b/pulsar/internal/connection_pool.go
index e07b23b..14f7753 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -20,6 +20,7 @@ package internal
import (
"net/url"
"sync"
+ "time"
"github.com/apache/pulsar-client-go/pulsar/internal/auth"
@@ -36,16 +37,18 @@ type ConnectionPool interface {
}
type connectionPool struct {
- pool sync.Map
- tlsOptions *TLSOptions
- auth auth.Provider
+ pool sync.Map
+ connectionTimeout time.Duration
+ tlsOptions *TLSOptions
+ auth auth.Provider
}
// NewConnectionPool init connection pool.
-func NewConnectionPool(tlsOptions *TLSOptions, auth auth.Provider)
ConnectionPool {
+func NewConnectionPool(tlsOptions *TLSOptions, auth auth.Provider,
connectionTimeout time.Duration) ConnectionPool {
return &connectionPool{
- tlsOptions: tlsOptions,
- auth: auth,
+ tlsOptions: tlsOptions,
+ auth: auth,
+ connectionTimeout: connectionTimeout,
}
}
@@ -66,7 +69,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL,
physicalAddr *url.U
// Try to create a new connection
newCnx, wasCached := p.pool.LoadOrStore(logicalAddr.Host,
- newConnection(logicalAddr, physicalAddr, p.tlsOptions, p.auth))
+ newConnection(logicalAddr, physicalAddr, p.tlsOptions,
p.connectionTimeout, p.auth))
cnx := newCnx.(*connection)
if !wasCached {
cnx.start()