This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new d6b14c6  Fixed the connection pool logic (#161)
d6b14c6 is described below

commit d6b14c66b81c0d2c00cf21587e7e7042c5fa1f56
Author: Matteo Merli <[email protected]>
AuthorDate: Sun Jan 5 05:17:02 2020 -0800

    Fixed the connection pool logic (#161)
    
    * Fixed the connection pool logic
    
    * Fixed mocked object for tests
    
    * Fixed indentation
---
 pulsar/consumer_partition.go           |  2 +-
 pulsar/internal/connection.go          | 37 +++++++++++++++++-----------------
 pulsar/internal/connection_pool.go     | 12 +++++------
 pulsar/internal/lookup_service.go      | 12 +++++------
 pulsar/internal/lookup_service_test.go |  2 +-
 pulsar/internal/rpc_client.go          |  8 ++++----
 pulsar/producer_partition.go           |  3 +--
 7 files changed, 35 insertions(+), 41 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 9d47581..55d1318 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -530,7 +530,7 @@ func (pc *partitionConsumer) grabConn() error {
        }
 
        res, err := pc.client.rpcClient.Request(lr.LogicalAddr, 
lr.PhysicalAddr, requestID,
-               pb.BaseCommand_SUBSCRIBE, cmdSubscribe, 
lr.ConnectingThroughProxy)
+               pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
 
        if err != nil {
                pc.log.WithError(err).Error("Failed to create consumer")
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 590b92f..b50ef12 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -119,10 +119,9 @@ type connection struct {
        state             connectionState
        connectionTimeout time.Duration
 
-       logicalAddr            *url.URL
-       physicalAddr           *url.URL
-       connectingThroughProxy bool
-       cnx                    net.Conn
+       logicalAddr  *url.URL
+       physicalAddr *url.URL
+       cnx          net.Conn
 
        writeBufferLock sync.Mutex
        writeBuffer     Buffer
@@ -153,21 +152,20 @@ type connection struct {
 }
 
 func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions 
*TLSOptions,
-       connectionTimeout time.Duration, auth auth.Provider, 
connectingThroughProxy bool) *connection {
+       connectionTimeout time.Duration, auth auth.Provider) *connection {
        cnx := &connection{
-               state:                  connectionInit,
-               connectionTimeout:      connectionTimeout,
-               logicalAddr:            logicalAddr,
-               physicalAddr:           physicalAddr,
-               connectingThroughProxy: connectingThroughProxy,
-               writeBuffer:            NewBuffer(4096),
-               log:                    log.WithField("remote_addr", 
physicalAddr),
-               pendingReqs:            make(map[uint64]*request),
-               lastDataReceivedTime:   time.Now(),
-               pingTicker:             time.NewTicker(keepAliveInterval),
-               pingCheckTicker:        time.NewTicker(keepAliveInterval),
-               tlsOptions:             tlsOptions,
-               auth:                   auth,
+               state:                connectionInit,
+               connectionTimeout:    connectionTimeout,
+               logicalAddr:          logicalAddr,
+               physicalAddr:         physicalAddr,
+               writeBuffer:          NewBuffer(4096),
+               log:                  log.WithField("remote_addr", 
physicalAddr),
+               pendingReqs:          make(map[uint64]*request),
+               lastDataReceivedTime: time.Now(),
+               pingTicker:           time.NewTicker(keepAliveInterval),
+               pingCheckTicker:      time.NewTicker(keepAliveInterval),
+               tlsOptions:           tlsOptions,
+               auth:                 auth,
 
                closeCh:            make(chan interface{}),
                incomingRequestsCh: make(chan *request, 10),
@@ -256,7 +254,8 @@ func (c *connection) doHandshake() bool {
                AuthMethodName:  proto.String(c.auth.Name()),
                AuthData:        authData,
        }
-       if c.connectingThroughProxy {
+
+       if c.logicalAddr.Host != c.physicalAddr.Host {
                cmdConnect.ProxyToBrokerUrl = proto.String(c.logicalAddr.Host)
        }
        c.writeCommand(baseCommand(pb.BaseCommand_CONNECT, cmdConnect))
diff --git a/pulsar/internal/connection_pool.go 
b/pulsar/internal/connection_pool.go
index efb5cf1..14f7753 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -18,7 +18,6 @@
 package internal
 
 import (
-       "fmt"
        "net/url"
        "sync"
        "time"
@@ -31,7 +30,7 @@ import (
 // ConnectionPool is a interface of connection pool.
 type ConnectionPool interface {
        // GetConnection get a connection from ConnectionPool.
-       GetConnection(logicalAddr *url.URL, physicalAddr *url.URL, 
connectingThroughProxy bool) (Connection, error)
+       GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, 
error)
 
        // Close all the connections in the pool
        Close()
@@ -53,9 +52,8 @@ func NewConnectionPool(tlsOptions *TLSOptions, auth 
auth.Provider, connectionTim
        }
 }
 
-func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr 
*url.URL,
-       connectingThroughProxy bool) (Connection, error) {
-       cachedCnx, found := p.pool.Load(fmt.Sprintf("%s:%v", logicalAddr.Host, 
connectingThroughProxy))
+func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr 
*url.URL) (Connection, error) {
+       cachedCnx, found := p.pool.Load(logicalAddr.Host)
        if found {
                cnx := cachedCnx.(*connection)
                log.Debug("Found connection in cache:", cnx.logicalAddr, 
cnx.physicalAddr)
@@ -70,8 +68,8 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, 
physicalAddr *url.U
        }
 
        // Try to create a new connection
-       newCnx, wasCached := p.pool.LoadOrStore(fmt.Sprintf("%s:%v", 
logicalAddr.Host, connectingThroughProxy),
-               newConnection(logicalAddr, physicalAddr, p.tlsOptions, 
p.connectionTimeout, p.auth, connectingThroughProxy))
+       newCnx, wasCached := p.pool.LoadOrStore(logicalAddr.Host,
+               newConnection(logicalAddr, physicalAddr, p.tlsOptions, 
p.connectionTimeout, p.auth))
        cnx := newCnx.(*connection)
        if !wasCached {
                cnx.start()
diff --git a/pulsar/internal/lookup_service.go 
b/pulsar/internal/lookup_service.go
index e056ea5..ee6dffe 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -30,9 +30,8 @@ import (
 
 // LookupResult encapsulates a struct for lookup a request, containing two 
parts: LogicalAddr, PhysicalAddr.
 type LookupResult struct {
-       LogicalAddr            *url.URL
-       PhysicalAddr           *url.URL
-       ConnectingThroughProxy bool
+       LogicalAddr  *url.URL
+       PhysicalAddr *url.URL
 }
 
 // LookupService is a interface of lookup service.
@@ -106,7 +105,7 @@ func (ls *lookupService) Lookup(topic string) 
(*LookupResult, error) {
                                RequestId:     &id,
                                Topic:         &topic,
                                Authoritative: lr.Authoritative,
-                       }, false)
+                       })
                        if err != nil {
                                return nil, err
                        }
@@ -124,9 +123,8 @@ func (ls *lookupService) Lookup(topic string) 
(*LookupResult, error) {
                        }
 
                        return &LookupResult{
-                               LogicalAddr:            logicalAddress,
-                               PhysicalAddr:           physicalAddress,
-                               ConnectingThroughProxy: 
lr.GetProxyThroughServiceUrl(),
+                               LogicalAddr:  logicalAddress,
+                               PhysicalAddr: physicalAddress,
                        }, nil
 
                case pb.CommandLookupTopicResponse_Failed:
diff --git a/pulsar/internal/lookup_service_test.go 
b/pulsar/internal/lookup_service_test.go
index cce6198..5bb1724 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -70,7 +70,7 @@ func (c *mockedRPCClient) RequestToAnyBroker(requestID 
uint64, cmdType pb.BaseCo
 }
 
 func (c *mockedRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, 
requestID uint64,
-       cmdType pb.BaseCommand_Type, message proto.Message, 
connectingThroughProxy bool) (*RPCResult, error) {
+       cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) 
{
        assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP)
        expectedRequest := &c.expectedRequests[0]
        c.expectedRequests = c.expectedRequests[1:]
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 3b5e622..23c3b61 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -45,7 +45,7 @@ type RPCClient interface {
        RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type, 
message proto.Message) (*RPCResult, error)
 
        Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
-               cmdType pb.BaseCommand_Type, message proto.Message, 
connectingThroughProxy bool) (*RPCResult, error)
+               cmdType pb.BaseCommand_Type, message proto.Message) 
(*RPCResult, error)
 
        RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message 
proto.Message)
 
@@ -71,13 +71,13 @@ func NewRPCClient(serviceURL *url.URL, pool ConnectionPool, 
requestTimeout time.
 
 func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType 
pb.BaseCommand_Type,
        message proto.Message) (*RPCResult, error) {
-       return c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, 
message, false)
+       return c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, 
message)
 }
 
 func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, 
requestID uint64,
-       cmdType pb.BaseCommand_Type, message proto.Message, 
connectingThroughProxy bool) (*RPCResult, error) {
+       cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) 
{
        // TODO: Add retry logic in case of connection issues
-       cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr, 
connectingThroughProxy)
+       cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
        if err != nil {
                return nil, err
        }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 0963570..d0ea449 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -140,8 +140,7 @@ func (p *partitionProducer) grabCnx() error {
        if len(p.options.Properties) > 0 {
                cmdProducer.Metadata = toKeyValues(p.options.Properties)
        }
-       res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, 
id, pb.BaseCommand_PRODUCER, cmdProducer,
-               lr.ConnectingThroughProxy)
+       res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, 
id, pb.BaseCommand_PRODUCER, cmdProducer)
        if err != nil {
                p.log.WithError(err).Error("Failed to create producer")
                return err

Reply via email to