This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 75d2df3  [PIP-165] Auto release idle connections (#963)
75d2df3 is described below

commit 75d2df3b7d1d1d04fb660a1b6c11ede1d2f161bf
Author: Zike Yang <[email protected]>
AuthorDate: Fri Mar 3 16:27:13 2023 +0800

    [PIP-165] Auto release idle connections (#963)
    
    ### Motivation
    
    The go implementation of 
PIP-165:https://github.com/apache/pulsar/issues/15516
    
    
    ### Modifications
    
    * Add new configuration `ConnectionMaxIdleTime` to `ClientOptions`
    * Add a goroutine to `ConnectionPool` to period check and release idle 
connections.
---
 pulsar/client.go                   |   4 ++
 pulsar/client_impl.go              |  14 ++++-
 pulsar/client_impl_test.go         | 115 +++++++++++++++++++++++++++++++++++++
 pulsar/internal/connection.go      |  48 ++++++++++++++++
 pulsar/internal/connection_pool.go |  33 ++++++++++-
 pulsar/internal/helper.go          |  33 +++++++++++
 6 files changed, 244 insertions(+), 3 deletions(-)

diff --git a/pulsar/client.go b/pulsar/client.go
index 75b363d..bc3f4f5 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -143,6 +143,10 @@ type ClientOptions struct {
        // Default prometheus.DefaultRegisterer
        MetricsRegisterer prometheus.Registerer
 
+       // Release the connection if it is not used for more than 
ConnectionMaxIdleTime.
+       // Default is 60 seconds, negative such as -1 to disable.
+       ConnectionMaxIdleTime time.Duration
+
        EnableTransaction bool
 
        // Limit of client memory usage (in byte). The 64M default can 
guarantee a high producer throughput.
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 7d90922..7c8fcc9 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -34,6 +34,8 @@ const (
        defaultOperationTimeout  = 30 * time.Second
        defaultKeepAliveInterval = 30 * time.Second
        defaultMemoryLimitBytes  = 64 * 1024 * 1024
+       defaultConnMaxIdleTime   = 180 * time.Second
+       minConnMaxIdleTime       = 60 * time.Second
 )
 
 type client struct {
@@ -56,6 +58,16 @@ func newClient(options ClientOptions) (Client, error) {
                logger = log.NewLoggerWithLogrus(logrus.StandardLogger())
        }
 
+       connectionMaxIdleTime := options.ConnectionMaxIdleTime
+       if connectionMaxIdleTime == 0 {
+               connectionMaxIdleTime = defaultConnMaxIdleTime
+       } else if connectionMaxIdleTime > 0 && connectionMaxIdleTime < 
minConnMaxIdleTime {
+               return nil, newError(InvalidConfiguration, 
fmt.Sprintf("Connection max idle time should be at least %f "+
+                       "seconds", minConnMaxIdleTime.Seconds()))
+       } else {
+               logger.Debugf("Disable auto release idle connections")
+       }
+
        if options.URL == "" {
                return nil, newError(InvalidConfiguration, "URL is required for 
client")
        }
@@ -143,7 +155,7 @@ func newClient(options ClientOptions) (Client, error) {
 
        c := &client{
                cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, 
connectionTimeout, keepAliveInterval,
-                       maxConnectionsPerHost, logger, metrics),
+                       maxConnectionsPerHost, logger, metrics, 
connectionMaxIdleTime),
                log:      logger,
                metrics:  metrics,
                memLimit: internal.NewMemoryLimitController(memLimitBytes),
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index b8efe9a..bb28371 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -21,6 +21,7 @@ import (
        "context"
        "crypto/tls"
        "fmt"
+       "log"
        "net/http"
        "net/http/httptest"
        "os"
@@ -1123,3 +1124,117 @@ func TestServiceUrlTLSWithTLSTransportWithBasicAuth(t 
*testing.T) {
 func TestWebServiceUrlTLSWithTLSTransportWithBasicAuth(t *testing.T) {
        testTLSTransportWithBasicAuth(t, webServiceURLTLS)
 }
+
+func TestConfigureConnectionMaxIdleTime(t *testing.T) {
+       _, err := NewClient(ClientOptions{
+               URL:                   serviceURL,
+               ConnectionMaxIdleTime: 1 * time.Second,
+       })
+
+       assert.Error(t, err, "Should be failed when the connectionMaxIdleTime 
is less than minConnMaxIdleTime")
+
+       cli, err := NewClient(ClientOptions{
+               URL:                   serviceURL,
+               ConnectionMaxIdleTime: -1, // Disabled
+       })
+
+       assert.Nil(t, err)
+       cli.Close()
+
+       cli, err = NewClient(ClientOptions{
+               URL:                   serviceURL,
+               ConnectionMaxIdleTime: 60 * time.Second,
+       })
+
+       assert.Nil(t, err)
+       cli.Close()
+}
+
+func testSendAndReceive(t *testing.T, producer Producer, consumer Consumer) {
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               if _, err := producer.Send(context.Background(), 
&ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       log.Fatal(err)
+               }
+       }
+
+       // receive 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       log.Fatal(err)
+               }
+
+               expectMsg := fmt.Sprintf("hello-%d", i)
+               assert.Equal(t, []byte(expectMsg), msg.Payload())
+               // ack message
+               err = consumer.Ack(msg)
+               if err != nil {
+                       return
+               }
+       }
+}
+
+func TestAutoCloseIdleConnection(t *testing.T) {
+       cli, err := NewClient(ClientOptions{
+               URL:                   serviceURL,
+               ConnectionMaxIdleTime: -1, // Disable auto release connections 
first, we will enable it manually later
+       })
+
+       assert.Nil(t, err)
+
+       topic := "TestAutoCloseIdleConnection"
+
+       // create consumer
+       consumer1, err := cli.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "my-sub",
+       })
+       assert.Nil(t, err)
+
+       // create producer
+       producer1, err := cli.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+
+       testSendAndReceive(t, producer1, consumer1)
+
+       pool := cli.(*client).cnxPool
+
+       producer1.Close()
+       consumer1.Close()
+
+       assert.NotEqual(t, 0, internal.GetConnectionsCount(&pool))
+
+       internal.StartCleanConnectionsTask(&pool, 2*time.Second) // Enable auto 
idle connections release manually
+
+       time.Sleep(6 * time.Second) // Need to wait at least 3 * 
ConnectionMaxIdleTime
+
+       assert.Equal(t, 0, internal.GetConnectionsCount(&pool))
+
+       // create consumer
+       consumer2, err := cli.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "my-sub",
+       })
+       assert.Nil(t, err)
+
+       // create producer
+       producer2, err := cli.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+
+       // Ensure the client still works
+       testSendAndReceive(t, producer2, consumer2)
+
+       producer2.Close()
+       consumer2.Close()
+
+       cli.Close()
+}
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 09623e5..55d04f3 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -170,6 +170,8 @@ type connection struct {
        metrics        *Metrics
 
        keepAliveInterval time.Duration
+
+       lastActive time.Time
 }
 
 // connectionOptions defines configurations for creating connection.
@@ -927,6 +929,52 @@ func (c *connection) UnregisterListener(id uint64) {
        delete(c.listeners, id)
 }
 
+func (c *connection) ResetLastActive() {
+       c.Lock()
+       defer c.Unlock()
+       c.lastActive = time.Now()
+}
+
+func (c *connection) isIdle() bool {
+       {
+               c.pendingLock.Lock()
+               defer c.pendingLock.Unlock()
+               if len(c.pendingReqs) != 0 {
+                       return false
+               }
+       }
+
+       {
+               c.listenersLock.RLock()
+               defer c.listenersLock.RUnlock()
+               if len(c.listeners) != 0 {
+                       return false
+               }
+       }
+
+       {
+               c.consumerHandlersLock.Lock()
+               defer c.consumerHandlersLock.Unlock()
+               if len(c.consumerHandlers) != 0 {
+                       return false
+               }
+       }
+
+       if len(c.incomingRequestsCh) != 0 || len(c.writeRequestsCh) != 0 {
+               return false
+       }
+       return true
+}
+
+func (c *connection) CheckIdle(maxIdleTime time.Duration) bool {
+       // We don't need to lock here because this method should only be
+       // called in a single goroutine of the connectionPool
+       if !c.isIdle() {
+               c.lastActive = time.Now()
+       }
+       return time.Since(c.lastActive) > maxIdleTime
+}
+
 // Close closes the connection by
 // closing underlying socket connection and closeCh.
 // This also triggers callbacks to the ConnectionClosed listeners.
diff --git a/pulsar/internal/connection_pool.go 
b/pulsar/internal/connection_pool.go
index 4f6d656..bf1e297 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -47,6 +47,7 @@ type connectionPool struct {
        maxConnectionsPerHost int32
        roundRobinCnt         int32
        keepAliveInterval     time.Duration
+       closeCh               chan struct{}
 
        metrics *Metrics
        log     log.Logger
@@ -60,8 +61,9 @@ func NewConnectionPool(
        keepAliveInterval time.Duration,
        maxConnectionsPerHost int,
        logger log.Logger,
-       metrics *Metrics) ConnectionPool {
-       return &connectionPool{
+       metrics *Metrics,
+       connectionMaxIdleTime time.Duration) ConnectionPool {
+       p := &connectionPool{
                connections:           make(map[string]*connection),
                tlsOptions:            tlsOptions,
                auth:                  auth,
@@ -70,7 +72,10 @@ func NewConnectionPool(
                keepAliveInterval:     keepAliveInterval,
                log:                   logger,
                metrics:               metrics,
+               closeCh:               make(chan struct{}),
        }
+       go p.checkAndCleanIdleConnections(connectionMaxIdleTime)
+       return p
 }
 
 func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr 
*url.URL) (Connection, error) {
@@ -109,6 +114,7 @@ func (p *connectionPool) GetConnection(logicalAddr 
*url.URL, physicalAddr *url.U
                p.Unlock()
                conn.start()
        } else {
+               conn.ResetLastActive()
                // we already have a connection
                p.Unlock()
        }
@@ -119,6 +125,7 @@ func (p *connectionPool) GetConnection(logicalAddr 
*url.URL, physicalAddr *url.U
 
 func (p *connectionPool) Close() {
        p.Lock()
+       close(p.closeCh)
        for k, c := range p.connections {
                delete(p.connections, k)
                c.Close()
@@ -134,3 +141,25 @@ func (p *connectionPool) getMapKey(addr *url.URL) string {
        idx := cnt % p.maxConnectionsPerHost
        return fmt.Sprint(addr.Host, '-', idx)
 }
+
+func (p *connectionPool) checkAndCleanIdleConnections(maxIdleTime 
time.Duration) {
+       if maxIdleTime < 0 {
+               return
+       }
+       for {
+               select {
+               case <-p.closeCh:
+                       return
+               case <-time.After(maxIdleTime):
+                       p.Lock()
+                       for k, c := range p.connections {
+                               if c.CheckIdle(maxIdleTime) {
+                                       c.log.Debugf("Closed connection due to 
inactivity.")
+                                       delete(p.connections, k)
+                                       c.Close()
+                               }
+                       }
+                       p.Unlock()
+               }
+       }
+}
diff --git a/pulsar/internal/helper.go b/pulsar/internal/helper.go
new file mode 100644
index 0000000..3bca1ee
--- /dev/null
+++ b/pulsar/internal/helper.go
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import "time"
+
+// These method should only be used by tests
+
+func StartCleanConnectionsTask(p *ConnectionPool, connectionMaxIdleTime 
time.Duration) {
+       go 
(*p).(*connectionPool).checkAndCleanIdleConnections(connectionMaxIdleTime)
+}
+
+func GetConnectionsCount(p *ConnectionPool) int {
+       pool := (*p).(*connectionPool)
+       pool.Lock()
+       defer pool.Unlock()
+       return len(pool.connections)
+}

Reply via email to