This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f8dc88e  Make keepalive interval configurable (#838)
f8dc88e is described below

commit f8dc88e9a9eb070ea8d9f3d6c31b2b00e600567c
Author: Zixuan Liu <[email protected]>
AuthorDate: Sat Sep 3 01:47:57 2022 +0800

    Make keepalive interval configurable (#838)
    
    * Make keepalive interval configurable
    
    Signed-off-by: Zixuan Liu <[email protected]>
    
    * Fix style
    
    Signed-off-by: Zixuan Liu <[email protected]>
    
    Signed-off-by: Zixuan Liu <[email protected]>
---
 pulsar/client.go                   |  3 +++
 pulsar/client_impl.go              | 10 ++++++++--
 pulsar/internal/connection.go      | 14 ++++++++------
 pulsar/internal/connection_pool.go |  8 ++++++--
 4 files changed, 25 insertions(+), 10 deletions(-)

diff --git a/pulsar/client.go b/pulsar/client.go
index 9a860ce..22b12ef 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -98,6 +98,9 @@ type ClientOptions struct {
        // operation will be marked as failed
        OperationTimeout time.Duration
 
+       // Configure the ping send and check interval, default to 30 seconds.
+       KeepAliveInterval time.Duration
+
        // Configure the authentication provider. (default: no authentication)
        // Example: `Authentication: NewAuthenticationTLS("my-cert.pem", 
"my-key.pem")`
        Authentication
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 1bf661e..e7fa642 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -33,6 +33,7 @@ import (
 const (
        defaultConnectionTimeout = 10 * time.Second
        defaultOperationTimeout  = 30 * time.Second
+       defaultKeepAliveInterval = 30 * time.Second
 )
 
 type client struct {
@@ -125,9 +126,14 @@ func newClient(options ClientOptions) (Client, error) {
                        int(options.MetricsCardinality), map[string]string{}, 
options.MetricsRegisterer)
        }
 
+       keepAliveInterval := options.KeepAliveInterval
+       if keepAliveInterval.Nanoseconds() == 0 {
+               keepAliveInterval = defaultKeepAliveInterval
+       }
+
        c := &client{
-               cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, 
connectionTimeout, maxConnectionsPerHost, logger,
-                       metrics),
+               cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, 
connectionTimeout, keepAliveInterval,
+                       maxConnectionsPerHost, logger, metrics),
                log:     logger,
                metrics: metrics,
        }
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 48dfd8f..8ddbc04 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -114,8 +114,6 @@ func (s connectionState) String() string {
        }
 }
 
-const keepAliveInterval = 30 * time.Second
-
 type request struct {
        id       *uint64
        cmd      *pb.BaseCommand
@@ -168,6 +166,8 @@ type connection struct {
 
        maxMessageSize int32
        metrics        *Metrics
+
+       keepAliveInterval time.Duration
 }
 
 // connectionOptions defines configurations for creating connection.
@@ -179,11 +179,13 @@ type connectionOptions struct {
        auth              auth.Provider
        logger            log.Logger
        metrics           *Metrics
+       keepAliveInterval time.Duration
 }
 
 func newConnection(opts connectionOptions) *connection {
        cnx := &connection{
                connectionTimeout:    opts.connectionTimeout,
+               keepAliveInterval:    opts.keepAliveInterval,
                logicalAddr:          opts.logicalAddr,
                physicalAddr:         opts.physicalAddr,
                writeBuffer:          NewBuffer(4096),
@@ -285,7 +287,7 @@ func (c *connection) doHandshake() bool {
 
        // During the initial handshake, the internal keep alive is not
        // active yet, so we need to timeout write and read requests
-       c.cnx.SetDeadline(time.Now().Add(keepAliveInterval))
+       c.cnx.SetDeadline(time.Now().Add(c.keepAliveInterval))
        cmdConnect := &pb.CommandConnect{
                ProtocolVersion: proto.Int32(PulsarProtocolVersion),
                ClientVersion:   proto.String(ClientVersionString),
@@ -369,8 +371,8 @@ func (c *connection) failLeftRequestsWhenClose() {
 }
 
 func (c *connection) run() {
-       pingSendTicker := time.NewTicker(keepAliveInterval)
-       pingCheckTicker := time.NewTicker(keepAliveInterval)
+       pingSendTicker := time.NewTicker(c.keepAliveInterval)
+       pingCheckTicker := time.NewTicker(c.keepAliveInterval)
 
        defer func() {
                // stop tickers
@@ -432,7 +434,7 @@ func (c *connection) runPingCheck(pingCheckTicker 
*time.Ticker) {
                case <-c.closeCh:
                        return
                case <-pingCheckTicker.C:
-                       if c.lastDataReceived().Add(2 * 
keepAliveInterval).Before(time.Now()) {
+                       if c.lastDataReceived().Add(2 * 
c.keepAliveInterval).Before(time.Now()) {
                                // We have not received a response to the 
previous Ping request, the
                                // connection to broker is stale
                                c.log.Warn("Detected stale connection to 
broker")
diff --git a/pulsar/internal/connection_pool.go 
b/pulsar/internal/connection_pool.go
index 5ec457e..cb172a1 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -45,9 +45,10 @@ type connectionPool struct {
        auth                  auth.Provider
        maxConnectionsPerHost int32
        roundRobinCnt         int32
-       metrics               *Metrics
+       keepAliveInterval     time.Duration
 
-       log log.Logger
+       metrics *Metrics
+       log     log.Logger
 }
 
 // NewConnectionPool init connection pool.
@@ -55,6 +56,7 @@ func NewConnectionPool(
        tlsOptions *TLSOptions,
        auth auth.Provider,
        connectionTimeout time.Duration,
+       keepAliveInterval time.Duration,
        maxConnectionsPerHost int,
        logger log.Logger,
        metrics *Metrics) ConnectionPool {
@@ -64,6 +66,7 @@ func NewConnectionPool(
                auth:                  auth,
                connectionTimeout:     connectionTimeout,
                maxConnectionsPerHost: int32(maxConnectionsPerHost),
+               keepAliveInterval:     keepAliveInterval,
                log:                   logger,
                metrics:               metrics,
        }
@@ -97,6 +100,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, 
physicalAddr *url.U
                        tls:               p.tlsOptions,
                        connectionTimeout: p.connectionTimeout,
                        auth:              p.auth,
+                       keepAliveInterval: p.keepAliveInterval,
                        logger:            p.log,
                        metrics:           p.metrics,
                })

Reply via email to