This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9995c22774 [INLONG-11668][SDK] Add max life time support for the
connections in conn pool of Golang SDK (#11669)
9995c22774 is described below
commit 9995c22774559b32f828935060d5c7e12085b2bb
Author: gunli <[email protected]>
AuthorDate: Wed Jan 15 11:05:37 2025 +0800
[INLONG-11668][SDK] Add max life time support for the connections in conn
pool of Golang SDK (#11669)
---
.../dataproxy-sdk-golang/README.md | 2 +
.../dataproxy-sdk-golang/connpool/connpool.go | 191 ++++++++++++++++++---
.../dataproxy-sdk-golang/dataproxy/client.go | 10 +-
.../dataproxy-sdk-golang/dataproxy/options.go | 4 +-
.../dataproxy/options_basic.go | 7 +
5 files changed, 180 insertions(+), 34 deletions(-)
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md
index 3273b2a464..60c8336970 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md
@@ -161,6 +161,8 @@ type Options struct {
BlockIfQueueIsFull bool // whether Send and
SendAsync block if producer's message queue is full, default: false
AddColumns map[string]string // addition columns to
add to the message, for example: __addcol1__worldid=xxx&__addcol2__ip=yyy, all
the message will be added 2 more columns with worldid=xxx and ip=yyy
addColumnStr string // the string format of
the AddColumns, just a cache, used internal
+ Auth Auth // dataproxy
authentication interface
+ MaxConnLifetime time.Duration // connection max
lifetime, default: 0, set to 5m/10m when the servers provide service though
CLBs (Cloud Load Balancers)
}
```
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
index d3bee80d0c..1054041cb4 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
@@ -45,23 +45,42 @@ var (
// Dialer is the interface of a dialer that return a NetConn
type Dialer interface {
- Dial(addr string) (gnet.Conn, error)
+ // Dial dials to the addr and bind ctx to the returned connection,
which network(TCP/UDP) to use is determined by the Dialer
+ // Dial should use gnet.Client.DialContext() to get a connection that
can be driven by a gnet event engine.
+ Dial(addr string, ctx any) (gnet.Conn, error)
+}
+
+// ConnContext is the additional attributes to set to a gnet.Conn
+type ConnContext struct {
+ CreatedAt time.Time // the created time of the connection
+ Endpoint string // the address of the remote endpoint
}
// EndpointRestrictedConnPool is the interface of a simple endpoint restricted
connection pool that
// the connection's remote address must be in an endpoint list, if not, it
will be closed and can
// not be used anymore, it is useful for holding the connections to a service
whose endpoints can
// be changed at runtime.
+// Best practice:
+// gnet is a high-performance networking package, the best way to use this
pool is:
+// 1. call Get() to get a gnet.Conn;
+// 2. use the conn to read/write for a duration, 1m, for example, and then
put the conn back to the pool and get a new one for load balancing, avoid
putting/getting frequently;
+// 3. do not switch(put and get) to a new conn in the callback of
gnet.Conn.AsyncWrite(buf []byte, callback AsyncCallback) or
gnet.Conn.AsyncWritev(bs [][]byte, callback AsyncCallback), it may be blocked;
+// 4. if you use TCP conn and can not update endpoints by service discovery
directly, for example, your endpoints are behind at the back of a LB, it is
better to set a max lifetime
+// for your pool, so that you can restart your endpoints(RS) without data
lost by:
+// 1). set the weight of your endpoint(RS) to 0, so that no new connection
incoming;
+// 2). wait for the existing connections to close by lifetime timeout;
+// 3). restart your endpoint.
type EndpointRestrictedConnPool interface {
- // Get gets a connection
+ // Get gets a connection, it's concurrency-safe, but you can not call
it in the callback of gnet.Conn.AsyncWrite() or gnet.Conn.AsyncWritev().
Get() (gnet.Conn, error)
- // Put puts a connection back to the pool, if err is not nil, the
connection will be closed by the pool
+ // Put puts a connection back to the pool, if err is not nil, the
connection will be closed by the pool, it's concurrency-safe,
+ // but you can not call it in the callback of gnet.Conn.AsyncWrite() or
gnet.Conn.AsyncWritev().
Put(conn gnet.Conn, err error)
- // UpdateEndpoints updates the endpoints the pool to dial to
+ // UpdateEndpoints updates the endpoints the pool to dial to, it's not
concurrency-safe.
UpdateEndpoints(all, add, del []string)
- // NumPooled returns the connection number in the pool, not the number
of all the connection that the pool created
+ // NumPooled returns the connection number in the pool, not the number
of all the connection that the pool created, it's concurrency-safe.
NumPooled() int
- // OnConnClosed used to notify that a connection is closed, the
connection will be removed from the pool, if err is not nil, the remote
endpoint will mark as unavailable
+ // OnConnClosed used to notify that a connection is closed, the
connection will be removed from the pool, if err is not nil, the remote
endpoint will mark as unavailable, it's concurrency-safe.
OnConnClosed(conn gnet.Conn, err error)
// Close closes the pool
Close()
@@ -69,7 +88,7 @@ type EndpointRestrictedConnPool interface {
// NewConnPool news a EndpointRestrictedConnPool
func NewConnPool(initEndpoints []string, connsPerEndpoint, size int,
- dialer Dialer, log logger.Logger) (EndpointRestrictedConnPool, error) {
+ dialer Dialer, log logger.Logger, maxConnLifetime time.Duration)
(EndpointRestrictedConnPool, error) {
if len(initEndpoints) == 0 {
return nil, ErrInitEndpointEmpty
}
@@ -107,7 +126,8 @@ func NewConnPool(initEndpoints []string, connsPerEndpoint,
size int,
Multiplier: 2,
Randomization: 0.5,
},
- closeCh: make(chan struct{}),
+ closeCh: make(chan struct{}),
+ maxConnLifetime: maxConnLifetime,
}
// store endpoints
@@ -143,7 +163,25 @@ type connPool struct {
backoff util.ExponentialBackoff
closeCh chan struct{}
closeOnce sync.Once
- endpointConnCounts sync.Map // store the conn count of each endpoint
+ endpointConnCounts sync.Map // store the conn count of each
endpoint
+ maxConnLifetime time.Duration // the max lifetime of a connection
+}
+
+func (p *connPool) expired(conn gnet.Conn) bool {
+ if conn == nil || p.maxConnLifetime <= 0 {
+ return false
+ }
+
+ ctx := conn.Context()
+ if ctx == nil {
+ return false
+ }
+
+ connCtx, ok := ctx.(ConnContext)
+ if !ok {
+ return false
+ }
+ return connCtx.CreatedAt.Add(p.maxConnLifetime).Before(time.Now())
}
func (p *connPool) Get() (gnet.Conn, error) {
@@ -204,7 +242,7 @@ func (p *connPool) newConn() (gnet.Conn, error) {
func (p *connPool) dialNewConn(ep string) (gnet.Conn, error) {
p.log.Debug("dialNewConn()")
- conn, err := p.dialer.Dial(ep)
+ conn, err := p.dialer.Dial(ep, ConnContext{CreatedAt: time.Now(),
Endpoint: ep})
if err != nil {
p.markUnavailable(ep)
return nil, err
@@ -278,6 +316,15 @@ func (p *connPool) put(conn gnet.Conn, err error,
isNewConn bool) {
return
}
+ // if conn is expired, close it
+ if p.expired(conn) {
+ p.log.Debug("connection expired, close it, addr:", addr, ",
err:", err)
+ CloseConn(conn, defaultConnCloseDelay)
+ // 关闭连接后,可用连接数变少,addr对应的节点的连接数可能也不均衡,尽管会递归调用当前函数,仍在这里追加创建新的连接
+ _ = p.appendNewConn(addr)
+ return
+ }
+
select {
case p.connChan <- conn:
// update the conn count
@@ -488,6 +535,17 @@ func (p *connPool) recoverAndRebalance() {
reBalanceTicker := time.NewTicker(defaultConnCloseDelay +
30*time.Second)
defer reBalanceTicker.Stop()
+ // clean expired conn every minute
+ var cleanExpiredConnTicker *time.Ticker
+ if p.maxConnLifetime > 0 {
+ cleanExpiredConnTicker = time.NewTicker(1 * time.Minute)
+ }
+ defer func() {
+ if cleanExpiredConnTicker != nil {
+ cleanExpiredConnTicker.Stop()
+ }
+ }()
+
for {
select {
case <-recoverTicker.C:
@@ -502,10 +560,79 @@ func (p *connPool) recoverAndRebalance() {
p.rebalance()
case <-p.closeCh:
return
+ default:
+ if cleanExpiredConnTicker != nil {
+ select {
+ case <-cleanExpiredConnTicker.C:
+ p.cleanExpiredConns()
+ default:
+ time.Sleep(time.Second)
+ }
+ }
}
}
}
+func getRemoteAddr(conn gnet.Conn) string {
+ if conn == nil {
+ return ""
+ }
+
+ addr := conn.RemoteAddr()
+ if addr != nil {
+ return addr.String()
+ }
+ ctx := conn.Context()
+ if ctx == nil {
+ return ""
+ }
+
+ connCtx, ok := ctx.(ConnContext)
+ if !ok {
+ return ""
+ }
+ return connCtx.Endpoint
+}
+
+func (p *connPool) cleanExpiredConns() {
+ p.log.Debug("cleanExpiredConns()")
+ var leftConns []gnet.Conn
+ var expiredConns []gnet.Conn
+loop:
+ for i := 0; i < cap(p.connChan); i++ {
+ select {
+ case conn := <-p.connChan:
+ if p.expired(conn) {
+ expiredConns = append(expiredConns, conn)
+ continue
+ }
+
+ // not the expired conn, put it back
+ leftConns = append(leftConns, conn)
+ default:
+ // no more conn, exit the loop
+ break loop
+ }
+ }
+
+ // put the conn back to the chan
+ for _, left := range leftConns {
+ select {
+ case p.connChan <- left:
+ default:
+ CloseConn(left, defaultConnCloseDelay)
+ }
+ }
+
+ // close the expired conn and append new conn with the same addr
+ for _, expired := range expiredConns {
+ addr := getRemoteAddr(expired)
+ p.log.Debug("connection expired, close it, addr:", addr, ",
err:", nil)
+ CloseConn(expired, defaultConnCloseDelay)
+ _ = p.appendNewConn(addr)
+ }
+}
+
func (p *connPool) dump() {
p.log.Debug("all endpoints:")
eps := p.endpoints.Load()
@@ -542,7 +669,7 @@ func (p *connPool) recover() bool {
}
if time.Since(lastUnavailable) > p.backoff.Next(retries) {
// try to create new conn
- conn, err := p.dialer.Dial(key.(string))
+ conn, err := p.dialer.Dial(key.(string),
ConnContext{CreatedAt: time.Now(), Endpoint: key.(string)})
if err == nil {
p.log.Debug("endpoint recovered, addr: ", key)
p.put(conn, nil, true)
@@ -675,15 +802,11 @@ func (p *connPool) rebalance() {
// add new conn
for i := currentCount; i < expectedConnPerEndpoint; i++
{
- conn, err := p.dialNewConn(addr)
- if err == nil {
- p.log.Debug("adding connection for
addr: ", addr)
- p.put(conn, nil, true)
- rebalanced = true
- } else {
- p.log.Warn("failed to add connection
during rebalancing, addr: ", addr, ", err: ", err)
- break
+ err := p.appendNewConn(addr)
+ if err != nil {
+ continue
}
+ rebalanced = true
}
} else if currentCount > expectedConnPerEndpoint {
rebalanced = true
@@ -699,15 +822,11 @@ func (p *connPool) rebalance() {
return true
}
for i := 0; i < expectedConnPerEndpoint; i++ {
- conn, err := p.dialNewConn(addr)
- if err == nil {
- p.log.Debug("adding connection for addr: ",
addr)
- p.put(conn, nil, true)
- rebalanced = true
- } else {
- p.log.Warn("failed to add connection during
rebalancing, addr: ", addr, ", err: ", err)
- break
+ err := p.appendNewConn(addr)
+ if err != nil {
+ continue
}
+ rebalanced = true
}
return true
})
@@ -717,6 +836,22 @@ func (p *connPool) rebalance() {
}
}
+func (p *connPool) appendNewConn(addr string) error {
+ if addr == "" {
+ return errors.New("addr is empty")
+ }
+
+ conn, err := p.dialNewConn(addr)
+ if err != nil {
+ p.log.Warn("failed to add connection, addr: ", addr, ", err: ",
err)
+ return err
+ }
+
+ p.log.Debug("adding connection for addr: ", addr)
+ p.put(conn, nil, true)
+ return nil
+}
+
func (p *connPool) removeEndpointConn(addr string, count int) {
var leftConns []gnet.Conn
var removed int
@@ -730,7 +865,7 @@ loop:
}
if remoteAddr.String() == addr {
- p.log.Info("reducing connection for addr: ",
addr)
+ p.log.Debug("reducing connection for addr: ",
addr)
// we do not decrease conn count here, if the
frequence of rebalancing is less then defaultConnCloseDelay, may lead to an
inaccurate expected conn count per endpoint
CloseConn(conn, defaultConnCloseDelay)
removed++
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
index 5ea6781276..3e1238ce94 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
@@ -166,7 +166,7 @@ func (c *client) initConns() error {
// minimum connection number per endpoint is 1
connsPerEndpoint := int(math.Ceil(float64(c.options.WorkerNum) * 1.2 /
float64(epLen)))
- pool, err := connpool.NewConnPool(endpoints, connsPerEndpoint, 512, c,
c.log)
+ pool, err := connpool.NewConnPool(endpoints, connsPerEndpoint, 512, c,
c.log, c.options.MaxConnLifetime)
if err != nil {
return err
}
@@ -176,7 +176,7 @@ func (c *client) initConns() error {
}
func (c *client) initFramer() error {
- framer, err := framer.NewLengthField(framer.LengthFieldCfg{
+ fr, err := framer.NewLengthField(framer.LengthFieldCfg{
MaxFrameLen: 64 * 1024,
FieldOffset: 0,
FieldLength: 4,
@@ -186,7 +186,7 @@ func (c *client) initFramer() error {
if err != nil {
return err
}
- c.framer = framer
+ c.framer = fr
return nil
}
@@ -211,8 +211,8 @@ func (c *client) initWorkers() error {
return nil
}
-func (c *client) Dial(addr string) (gnet.Conn, error) {
- return c.netClient.Dial("tcp", addr)
+func (c *client) Dial(addr string, ctx any) (gnet.Conn, error) {
+ return c.netClient.DialContext("tcp", addr, ctx)
}
func (c *client) Send(ctx context.Context, msg Message) error {
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go
index 490e90ce79..5d4692f077 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go
@@ -23,9 +23,10 @@ import (
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/util"
+ "github.com/prometheus/client_golang/prometheus"
+
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/bufferpool"
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/logger"
- "github.com/prometheus/client_golang/prometheus"
)
const (
@@ -78,6 +79,7 @@ type Options struct {
AddColumns map[string]string // addition columns to
add to the message, for example: __addcol1__worldid=xxx&__addcol2__ip=yyy, all
the message will be added 2 more columns with worldid=xxx and ip=yyy
addColumnStr string // the string format of
the AddColumns, just a cache, used internal
Auth Auth // dataproxy
authentication interface
+ MaxConnLifetime time.Duration // connection max
lifetime, default: 0, set to 5m/10m when the servers provide service though
CLBs (Cloud Load Balancers)
}
// ValidateAndSetDefault validates an options and set up the default values
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_basic.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_basic.go
index 5265a1a628..de3c228769 100755
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_basic.go
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_basic.go
@@ -182,3 +182,10 @@ func WithAuth(auth Auth) Option {
o.Auth = auth
}
}
+
+// WithMaxConnLifetime sets MaxConnLifetime
+func WithMaxConnLifetime(lifetime time.Duration) Option {
+ return func(o *Options) {
+ o.MaxConnLifetime = lifetime
+ }
+}