This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new d6b14c6 Fixed the connection pool logic (#161)
d6b14c6 is described below
commit d6b14c66b81c0d2c00cf21587e7e7042c5fa1f56
Author: Matteo Merli <[email protected]>
AuthorDate: Sun Jan 5 05:17:02 2020 -0800
Fixed the connection pool logic (#161)
* Fixed the connection pool logic
* Fixed mocked object for tests
* Fixed indentation
---
pulsar/consumer_partition.go | 2 +-
pulsar/internal/connection.go | 37 +++++++++++++++++-----------------
pulsar/internal/connection_pool.go | 12 +++++------
pulsar/internal/lookup_service.go | 12 +++++------
pulsar/internal/lookup_service_test.go | 2 +-
pulsar/internal/rpc_client.go | 8 ++++----
pulsar/producer_partition.go | 3 +--
7 files changed, 35 insertions(+), 41 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 9d47581..55d1318 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -530,7 +530,7 @@ func (pc *partitionConsumer) grabConn() error {
}
res, err := pc.client.rpcClient.Request(lr.LogicalAddr,
lr.PhysicalAddr, requestID,
- pb.BaseCommand_SUBSCRIBE, cmdSubscribe,
lr.ConnectingThroughProxy)
+ pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
if err != nil {
pc.log.WithError(err).Error("Failed to create consumer")
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 590b92f..b50ef12 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -119,10 +119,9 @@ type connection struct {
state connectionState
connectionTimeout time.Duration
- logicalAddr *url.URL
- physicalAddr *url.URL
- connectingThroughProxy bool
- cnx net.Conn
+ logicalAddr *url.URL
+ physicalAddr *url.URL
+ cnx net.Conn
writeBufferLock sync.Mutex
writeBuffer Buffer
@@ -153,21 +152,20 @@ type connection struct {
}
func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions
*TLSOptions,
- connectionTimeout time.Duration, auth auth.Provider,
connectingThroughProxy bool) *connection {
+ connectionTimeout time.Duration, auth auth.Provider) *connection {
cnx := &connection{
- state: connectionInit,
- connectionTimeout: connectionTimeout,
- logicalAddr: logicalAddr,
- physicalAddr: physicalAddr,
- connectingThroughProxy: connectingThroughProxy,
- writeBuffer: NewBuffer(4096),
- log: log.WithField("remote_addr",
physicalAddr),
- pendingReqs: make(map[uint64]*request),
- lastDataReceivedTime: time.Now(),
- pingTicker: time.NewTicker(keepAliveInterval),
- pingCheckTicker: time.NewTicker(keepAliveInterval),
- tlsOptions: tlsOptions,
- auth: auth,
+ state: connectionInit,
+ connectionTimeout: connectionTimeout,
+ logicalAddr: logicalAddr,
+ physicalAddr: physicalAddr,
+ writeBuffer: NewBuffer(4096),
+ log: log.WithField("remote_addr",
physicalAddr),
+ pendingReqs: make(map[uint64]*request),
+ lastDataReceivedTime: time.Now(),
+ pingTicker: time.NewTicker(keepAliveInterval),
+ pingCheckTicker: time.NewTicker(keepAliveInterval),
+ tlsOptions: tlsOptions,
+ auth: auth,
closeCh: make(chan interface{}),
incomingRequestsCh: make(chan *request, 10),
@@ -256,7 +254,8 @@ func (c *connection) doHandshake() bool {
AuthMethodName: proto.String(c.auth.Name()),
AuthData: authData,
}
- if c.connectingThroughProxy {
+
+ if c.logicalAddr.Host != c.physicalAddr.Host {
cmdConnect.ProxyToBrokerUrl = proto.String(c.logicalAddr.Host)
}
c.writeCommand(baseCommand(pb.BaseCommand_CONNECT, cmdConnect))
diff --git a/pulsar/internal/connection_pool.go
b/pulsar/internal/connection_pool.go
index efb5cf1..14f7753 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -18,7 +18,6 @@
package internal
import (
- "fmt"
"net/url"
"sync"
"time"
@@ -31,7 +30,7 @@ import (
// ConnectionPool is a interface of connection pool.
type ConnectionPool interface {
// GetConnection get a connection from ConnectionPool.
- GetConnection(logicalAddr *url.URL, physicalAddr *url.URL,
connectingThroughProxy bool) (Connection, error)
+ GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection,
error)
// Close all the connections in the pool
Close()
@@ -53,9 +52,8 @@ func NewConnectionPool(tlsOptions *TLSOptions, auth
auth.Provider, connectionTim
}
}
-func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr
*url.URL,
- connectingThroughProxy bool) (Connection, error) {
- cachedCnx, found := p.pool.Load(fmt.Sprintf("%s:%v", logicalAddr.Host,
connectingThroughProxy))
+func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr
*url.URL) (Connection, error) {
+ cachedCnx, found := p.pool.Load(logicalAddr.Host)
if found {
cnx := cachedCnx.(*connection)
log.Debug("Found connection in cache:", cnx.logicalAddr,
cnx.physicalAddr)
@@ -70,8 +68,8 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL,
physicalAddr *url.U
}
// Try to create a new connection
- newCnx, wasCached := p.pool.LoadOrStore(fmt.Sprintf("%s:%v",
logicalAddr.Host, connectingThroughProxy),
- newConnection(logicalAddr, physicalAddr, p.tlsOptions,
p.connectionTimeout, p.auth, connectingThroughProxy))
+ newCnx, wasCached := p.pool.LoadOrStore(logicalAddr.Host,
+ newConnection(logicalAddr, physicalAddr, p.tlsOptions,
p.connectionTimeout, p.auth))
cnx := newCnx.(*connection)
if !wasCached {
cnx.start()
diff --git a/pulsar/internal/lookup_service.go
b/pulsar/internal/lookup_service.go
index e056ea5..ee6dffe 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -30,9 +30,8 @@ import (
// LookupResult encapsulates a struct for lookup a request, containing two
parts: LogicalAddr, PhysicalAddr.
type LookupResult struct {
- LogicalAddr *url.URL
- PhysicalAddr *url.URL
- ConnectingThroughProxy bool
+ LogicalAddr *url.URL
+ PhysicalAddr *url.URL
}
// LookupService is a interface of lookup service.
@@ -106,7 +105,7 @@ func (ls *lookupService) Lookup(topic string)
(*LookupResult, error) {
RequestId: &id,
Topic: &topic,
Authoritative: lr.Authoritative,
- }, false)
+ })
if err != nil {
return nil, err
}
@@ -124,9 +123,8 @@ func (ls *lookupService) Lookup(topic string)
(*LookupResult, error) {
}
return &LookupResult{
- LogicalAddr: logicalAddress,
- PhysicalAddr: physicalAddress,
- ConnectingThroughProxy:
lr.GetProxyThroughServiceUrl(),
+ LogicalAddr: logicalAddress,
+ PhysicalAddr: physicalAddress,
}, nil
case pb.CommandLookupTopicResponse_Failed:
diff --git a/pulsar/internal/lookup_service_test.go
b/pulsar/internal/lookup_service_test.go
index cce6198..5bb1724 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -70,7 +70,7 @@ func (c *mockedRPCClient) RequestToAnyBroker(requestID
uint64, cmdType pb.BaseCo
}
func (c *mockedRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.URL,
requestID uint64,
- cmdType pb.BaseCommand_Type, message proto.Message,
connectingThroughProxy bool) (*RPCResult, error) {
+ cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
{
assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP)
expectedRequest := &c.expectedRequests[0]
c.expectedRequests = c.expectedRequests[1:]
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 3b5e622..23c3b61 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -45,7 +45,7 @@ type RPCClient interface {
RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error)
Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
- cmdType pb.BaseCommand_Type, message proto.Message,
connectingThroughProxy bool) (*RPCResult, error)
+ cmdType pb.BaseCommand_Type, message proto.Message)
(*RPCResult, error)
RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message
proto.Message)
@@ -71,13 +71,13 @@ func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
requestTimeout time.
func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType
pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
- return c.Request(c.serviceURL, c.serviceURL, requestID, cmdType,
message, false)
+ return c.Request(c.serviceURL, c.serviceURL, requestID, cmdType,
message)
}
func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL,
requestID uint64,
- cmdType pb.BaseCommand_Type, message proto.Message,
connectingThroughProxy bool) (*RPCResult, error) {
+ cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
{
// TODO: Add retry logic in case of connection issues
- cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr,
connectingThroughProxy)
+ cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
if err != nil {
return nil, err
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 0963570..d0ea449 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -140,8 +140,7 @@ func (p *partitionProducer) grabCnx() error {
if len(p.options.Properties) > 0 {
cmdProducer.Metadata = toKeyValues(p.options.Properties)
}
- res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr,
id, pb.BaseCommand_PRODUCER, cmdProducer,
- lr.ConnectingThroughProxy)
+ res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr,
id, pb.BaseCommand_PRODUCER, cmdProducer)
if err != nil {
p.log.WithError(err).Error("Failed to create producer")
return err