This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c979046  Allow to have multiple connections per broker (#276)
c979046 is described below

commit c979046238a71838ee7bf5a6ff6f74a725d8f6b7
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Jun 10 22:38:27 2020 -0700

    Allow to have multiple connections per broker (#276)
    
    - Allow to have multiple connections per broker
---
 pulsar/client.go                   |  3 +++
 pulsar/client_impl.go              |  7 ++++++-
 pulsar/internal/connection_pool.go | 42 ++++++++++++++++++++++++++++----------
 3 files changed, 40 insertions(+), 12 deletions(-)

diff --git a/pulsar/client.go b/pulsar/client.go
index 53de468..d4af906 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -88,6 +88,9 @@ type ClientOptions struct {
 
        // Configure whether the Pulsar client verify the validity of the host 
name from broker (default: false)
        TLSValidateHostname bool
+
+       // Max number of connections to a single broker that will kept in the 
pool. (Default: 1 connection)
+       MaxConnectionsPerBroker int
 }
 
 type Client interface {
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 02d9883..d731183 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -95,8 +95,13 @@ func newClient(options ClientOptions) (Client, error) {
                operationTimeout = defaultOperationTimeout
        }
 
+       maxConnectionsPerHost := options.MaxConnectionsPerBroker
+       if maxConnectionsPerHost <= 0 {
+               maxConnectionsPerHost = 1
+       }
+
        c := &client{
-               cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, 
connectionTimeout),
+               cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, 
connectionTimeout, maxConnectionsPerHost),
        }
        c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout)
        c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig 
!= nil)
diff --git a/pulsar/internal/connection_pool.go 
b/pulsar/internal/connection_pool.go
index b91324a..a90ac42 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -18,8 +18,10 @@
 package internal
 
 import (
+       "fmt"
        "net/url"
        "sync"
+       "sync/atomic"
        "time"
 
        "github.com/apache/pulsar-client-go/pulsar/internal/auth"
@@ -37,23 +39,31 @@ type ConnectionPool interface {
 }
 
 type connectionPool struct {
-       pool              sync.Map
-       connectionTimeout time.Duration
-       tlsOptions        *TLSOptions
-       auth              auth.Provider
+       pool                  sync.Map
+       connectionTimeout     time.Duration
+       tlsOptions            *TLSOptions
+       auth                  auth.Provider
+       maxConnectionsPerHost int32
+       roundRobinCnt         int32
 }
 
 // NewConnectionPool init connection pool.
-func NewConnectionPool(tlsOptions *TLSOptions, auth auth.Provider, 
connectionTimeout time.Duration) ConnectionPool {
+func NewConnectionPool(
+       tlsOptions *TLSOptions,
+       auth auth.Provider,
+       connectionTimeout time.Duration,
+       maxConnectionsPerHost int) ConnectionPool {
        return &connectionPool{
-               tlsOptions:        tlsOptions,
-               auth:              auth,
-               connectionTimeout: connectionTimeout,
+               tlsOptions:            tlsOptions,
+               auth:                  auth,
+               connectionTimeout:     connectionTimeout,
+               maxConnectionsPerHost: int32(maxConnectionsPerHost),
        }
 }
 
 func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr 
*url.URL) (Connection, error) {
-       cachedCnx, found := p.pool.Load(logicalAddr.Host)
+       key := p.getMapKey(logicalAddr)
+       cachedCnx, found := p.pool.Load(key)
        if found {
                cnx := cachedCnx.(*connection)
                log.Debug("Found connection in cache:", cnx.logicalAddr, 
cnx.physicalAddr)
@@ -63,14 +73,15 @@ func (p *connectionPool) GetConnection(logicalAddr 
*url.URL, physicalAddr *url.U
                        return cnx, nil
                }
                // The cached connection is failed
-               p.pool.Delete(logicalAddr.Host)
+               p.pool.Delete(key)
                log.Debug("Removed failed connection from pool:", 
cnx.logicalAddr, cnx.physicalAddr)
        }
 
        // Try to create a new connection
        newConnection := newConnection(logicalAddr, physicalAddr, p.tlsOptions, 
p.connectionTimeout, p.auth)
-       newCnx, wasCached := p.pool.LoadOrStore(logicalAddr.Host, newConnection)
+       newCnx, wasCached := p.pool.LoadOrStore(key, newConnection)
        cnx := newCnx.(*connection)
+
        if !wasCached {
                cnx.start()
        } else {
@@ -89,3 +100,12 @@ func (p *connectionPool) Close() {
                return true
        })
 }
+
+func (p *connectionPool) getMapKey(addr *url.URL) string {
+       cnt := atomic.AddInt32(&p.roundRobinCnt, 1)
+       if cnt < 0 {
+               cnt = -cnt
+       }
+       idx := cnt % p.maxConnectionsPerHost
+       return fmt.Sprint(addr.Host, '-', idx)
+}

Reply via email to