This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 75d2df3 [PIP-165] Auto release idle connections (#963)
75d2df3 is described below
commit 75d2df3b7d1d1d04fb660a1b6c11ede1d2f161bf
Author: Zike Yang <[email protected]>
AuthorDate: Fri Mar 3 16:27:13 2023 +0800
[PIP-165] Auto release idle connections (#963)
### Motivation
The go implementation of
PIP-165:https://github.com/apache/pulsar/issues/15516
### Modifications
* Add new configuration `ConnectionMaxIdleTime` to `ClientOptions`
* Add a goroutine to `ConnectionPool` to period check and release idle
connections.
---
pulsar/client.go | 4 ++
pulsar/client_impl.go | 14 ++++-
pulsar/client_impl_test.go | 115 +++++++++++++++++++++++++++++++++++++
pulsar/internal/connection.go | 48 ++++++++++++++++
pulsar/internal/connection_pool.go | 33 ++++++++++-
pulsar/internal/helper.go | 33 +++++++++++
6 files changed, 244 insertions(+), 3 deletions(-)
diff --git a/pulsar/client.go b/pulsar/client.go
index 75b363d..bc3f4f5 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -143,6 +143,10 @@ type ClientOptions struct {
// Default prometheus.DefaultRegisterer
MetricsRegisterer prometheus.Registerer
+ // Release the connection if it is not used for more than
ConnectionMaxIdleTime.
+ // Default is 60 seconds, negative such as -1 to disable.
+ ConnectionMaxIdleTime time.Duration
+
EnableTransaction bool
// Limit of client memory usage (in byte). The 64M default can
guarantee a high producer throughput.
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 7d90922..7c8fcc9 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -34,6 +34,8 @@ const (
defaultOperationTimeout = 30 * time.Second
defaultKeepAliveInterval = 30 * time.Second
defaultMemoryLimitBytes = 64 * 1024 * 1024
+ defaultConnMaxIdleTime = 180 * time.Second
+ minConnMaxIdleTime = 60 * time.Second
)
type client struct {
@@ -56,6 +58,16 @@ func newClient(options ClientOptions) (Client, error) {
logger = log.NewLoggerWithLogrus(logrus.StandardLogger())
}
+ connectionMaxIdleTime := options.ConnectionMaxIdleTime
+ if connectionMaxIdleTime == 0 {
+ connectionMaxIdleTime = defaultConnMaxIdleTime
+ } else if connectionMaxIdleTime > 0 && connectionMaxIdleTime <
minConnMaxIdleTime {
+ return nil, newError(InvalidConfiguration,
fmt.Sprintf("Connection max idle time should be at least %f "+
+ "seconds", minConnMaxIdleTime.Seconds()))
+ } else {
+ logger.Debugf("Disable auto release idle connections")
+ }
+
if options.URL == "" {
return nil, newError(InvalidConfiguration, "URL is required for
client")
}
@@ -143,7 +155,7 @@ func newClient(options ClientOptions) (Client, error) {
c := &client{
cnxPool: internal.NewConnectionPool(tlsConfig, authProvider,
connectionTimeout, keepAliveInterval,
- maxConnectionsPerHost, logger, metrics),
+ maxConnectionsPerHost, logger, metrics,
connectionMaxIdleTime),
log: logger,
metrics: metrics,
memLimit: internal.NewMemoryLimitController(memLimitBytes),
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index b8efe9a..bb28371 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -21,6 +21,7 @@ import (
"context"
"crypto/tls"
"fmt"
+ "log"
"net/http"
"net/http/httptest"
"os"
@@ -1123,3 +1124,117 @@ func TestServiceUrlTLSWithTLSTransportWithBasicAuth(t
*testing.T) {
func TestWebServiceUrlTLSWithTLSTransportWithBasicAuth(t *testing.T) {
testTLSTransportWithBasicAuth(t, webServiceURLTLS)
}
+
+func TestConfigureConnectionMaxIdleTime(t *testing.T) {
+ _, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ ConnectionMaxIdleTime: 1 * time.Second,
+ })
+
+ assert.Error(t, err, "Should be failed when the connectionMaxIdleTime
is less than minConnMaxIdleTime")
+
+ cli, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ ConnectionMaxIdleTime: -1, // Disabled
+ })
+
+ assert.Nil(t, err)
+ cli.Close()
+
+ cli, err = NewClient(ClientOptions{
+ URL: serviceURL,
+ ConnectionMaxIdleTime: 60 * time.Second,
+ })
+
+ assert.Nil(t, err)
+ cli.Close()
+}
+
+func testSendAndReceive(t *testing.T, producer Producer, consumer Consumer) {
+ // send 10 messages
+ for i := 0; i < 10; i++ {
+ if _, err := producer.Send(context.Background(),
&ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ }); err != nil {
+ log.Fatal(err)
+ }
+ }
+
+ // receive 10 messages
+ for i := 0; i < 10; i++ {
+ msg, err := consumer.Receive(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ expectMsg := fmt.Sprintf("hello-%d", i)
+ assert.Equal(t, []byte(expectMsg), msg.Payload())
+ // ack message
+ err = consumer.Ack(msg)
+ if err != nil {
+ return
+ }
+ }
+}
+
+func TestAutoCloseIdleConnection(t *testing.T) {
+ cli, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ ConnectionMaxIdleTime: -1, // Disable auto release connections
first, we will enable it manually later
+ })
+
+ assert.Nil(t, err)
+
+ topic := "TestAutoCloseIdleConnection"
+
+ // create consumer
+ consumer1, err := cli.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ })
+ assert.Nil(t, err)
+
+ // create producer
+ producer1, err := cli.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+
+ testSendAndReceive(t, producer1, consumer1)
+
+ pool := cli.(*client).cnxPool
+
+ producer1.Close()
+ consumer1.Close()
+
+ assert.NotEqual(t, 0, internal.GetConnectionsCount(&pool))
+
+ internal.StartCleanConnectionsTask(&pool, 2*time.Second) // Enable auto
idle connections release manually
+
+ time.Sleep(6 * time.Second) // Need to wait at least 3 *
ConnectionMaxIdleTime
+
+ assert.Equal(t, 0, internal.GetConnectionsCount(&pool))
+
+ // create consumer
+ consumer2, err := cli.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ })
+ assert.Nil(t, err)
+
+ // create producer
+ producer2, err := cli.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+
+ // Ensure the client still works
+ testSendAndReceive(t, producer2, consumer2)
+
+ producer2.Close()
+ consumer2.Close()
+
+ cli.Close()
+}
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 09623e5..55d04f3 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -170,6 +170,8 @@ type connection struct {
metrics *Metrics
keepAliveInterval time.Duration
+
+ lastActive time.Time
}
// connectionOptions defines configurations for creating connection.
@@ -927,6 +929,52 @@ func (c *connection) UnregisterListener(id uint64) {
delete(c.listeners, id)
}
+func (c *connection) ResetLastActive() {
+ c.Lock()
+ defer c.Unlock()
+ c.lastActive = time.Now()
+}
+
+func (c *connection) isIdle() bool {
+ {
+ c.pendingLock.Lock()
+ defer c.pendingLock.Unlock()
+ if len(c.pendingReqs) != 0 {
+ return false
+ }
+ }
+
+ {
+ c.listenersLock.RLock()
+ defer c.listenersLock.RUnlock()
+ if len(c.listeners) != 0 {
+ return false
+ }
+ }
+
+ {
+ c.consumerHandlersLock.Lock()
+ defer c.consumerHandlersLock.Unlock()
+ if len(c.consumerHandlers) != 0 {
+ return false
+ }
+ }
+
+ if len(c.incomingRequestsCh) != 0 || len(c.writeRequestsCh) != 0 {
+ return false
+ }
+ return true
+}
+
+func (c *connection) CheckIdle(maxIdleTime time.Duration) bool {
+ // We don't need to lock here because this method should only be
+ // called in a single goroutine of the connectionPool
+ if !c.isIdle() {
+ c.lastActive = time.Now()
+ }
+ return time.Since(c.lastActive) > maxIdleTime
+}
+
// Close closes the connection by
// closing underlying socket connection and closeCh.
// This also triggers callbacks to the ConnectionClosed listeners.
diff --git a/pulsar/internal/connection_pool.go
b/pulsar/internal/connection_pool.go
index 4f6d656..bf1e297 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -47,6 +47,7 @@ type connectionPool struct {
maxConnectionsPerHost int32
roundRobinCnt int32
keepAliveInterval time.Duration
+ closeCh chan struct{}
metrics *Metrics
log log.Logger
@@ -60,8 +61,9 @@ func NewConnectionPool(
keepAliveInterval time.Duration,
maxConnectionsPerHost int,
logger log.Logger,
- metrics *Metrics) ConnectionPool {
- return &connectionPool{
+ metrics *Metrics,
+ connectionMaxIdleTime time.Duration) ConnectionPool {
+ p := &connectionPool{
connections: make(map[string]*connection),
tlsOptions: tlsOptions,
auth: auth,
@@ -70,7 +72,10 @@ func NewConnectionPool(
keepAliveInterval: keepAliveInterval,
log: logger,
metrics: metrics,
+ closeCh: make(chan struct{}),
}
+ go p.checkAndCleanIdleConnections(connectionMaxIdleTime)
+ return p
}
func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr
*url.URL) (Connection, error) {
@@ -109,6 +114,7 @@ func (p *connectionPool) GetConnection(logicalAddr
*url.URL, physicalAddr *url.U
p.Unlock()
conn.start()
} else {
+ conn.ResetLastActive()
// we already have a connection
p.Unlock()
}
@@ -119,6 +125,7 @@ func (p *connectionPool) GetConnection(logicalAddr
*url.URL, physicalAddr *url.U
func (p *connectionPool) Close() {
p.Lock()
+ close(p.closeCh)
for k, c := range p.connections {
delete(p.connections, k)
c.Close()
@@ -134,3 +141,25 @@ func (p *connectionPool) getMapKey(addr *url.URL) string {
idx := cnt % p.maxConnectionsPerHost
return fmt.Sprint(addr.Host, '-', idx)
}
+
+func (p *connectionPool) checkAndCleanIdleConnections(maxIdleTime
time.Duration) {
+ if maxIdleTime < 0 {
+ return
+ }
+ for {
+ select {
+ case <-p.closeCh:
+ return
+ case <-time.After(maxIdleTime):
+ p.Lock()
+ for k, c := range p.connections {
+ if c.CheckIdle(maxIdleTime) {
+ c.log.Debugf("Closed connection due to
inactivity.")
+ delete(p.connections, k)
+ c.Close()
+ }
+ }
+ p.Unlock()
+ }
+ }
+}
diff --git a/pulsar/internal/helper.go b/pulsar/internal/helper.go
new file mode 100644
index 0000000..3bca1ee
--- /dev/null
+++ b/pulsar/internal/helper.go
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import "time"
+
+// These method should only be used by tests
+
+func StartCleanConnectionsTask(p *ConnectionPool, connectionMaxIdleTime
time.Duration) {
+ go
(*p).(*connectionPool).checkAndCleanIdleConnections(connectionMaxIdleTime)
+}
+
+func GetConnectionsCount(p *ConnectionPool) int {
+ pool := (*p).(*connectionPool)
+ pool.Lock()
+ defer pool.Unlock()
+ return len(pool.connections)
+}