This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 542b2e8684 [INLONG-16070][SDK] Fix potential block in Golang SDK
(#10674)
542b2e8684 is described below
commit 542b2e8684e8767c336579020f0e05cfc9e27ab0
Author: gunli <[email protected]>
AuthorDate: Fri Jul 19 14:08:33 2024 +0800
[INLONG-16070][SDK] Fix potential block in Golang SDK (#10674)
Co-authored-by: gunli <[email protected]>
---
.../dataproxy-sdk-golang/dataproxy/client.go | 28 +++++---
.../dataproxy-sdk-golang/dataproxy/worker.go | 83 +++++++++++++++++-----
2 files changed, 86 insertions(+), 25 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
index b6f3a0f2af..cce1e4e4ab 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
@@ -91,23 +91,23 @@ func NewClient(opts ...Option) (Client, error) {
func (c *client) initAll() error {
// the following initialization order must not be changed。
- err := c.initDiscoverer()
+ err := c.initMetrics()
if err != nil {
return err
}
- err = c.initNetClient()
+ err = c.initDiscoverer()
if err != nil {
return err
}
- err = c.initConns()
+ err = c.initNetClient()
if err != nil {
return err
}
- err = c.initFramer()
+ err = c.initConns()
if err != nil {
return err
}
- err = c.initMetrics()
+ err = c.initFramer()
if err != nil {
return err
}
@@ -162,10 +162,8 @@ func (c *client) initConns() error {
endpoints[i] = epList[i].Addr
}
- // maximum connection number per endpoint is 3
- connsPerEndpoint := c.options.WorkerNum/epLen + 1
- connsPerEndpoint = int(math.Min(3, float64(connsPerEndpoint)))
-
+ // minimum connection number per endpoint is 1
+ connsPerEndpoint := int(math.Ceil(float64(c.options.WorkerNum) * 1.2 /
float64(epLen)))
pool, err := connpool.NewConnPool(endpoints, connsPerEndpoint, 512, c,
c.log)
if err != nil {
return err
@@ -296,6 +294,18 @@ func (c *client) OnClose(conn gnet.Conn, err error)
gnet.Action {
c.log.Warn("connection closed: ", conn.RemoteAddr(), ", err: ",
err)
c.metrics.incError(errConnClosedByPeer.strCode)
}
+
+ // delete this conn from conn pool
+ if c.connPool != nil {
+ c.connPool.OnConnClosed(conn, err)
+ }
+
+ if err != nil {
+ for _, w := range c.workers {
+ w.onConnClosed(conn, err)
+ }
+ }
+
return gnet.None
}
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
index 2991364b82..935c69732c 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
@@ -25,11 +25,12 @@ import (
"github.com/gofrs/uuid"
+ "github.com/panjf2000/gnet/v2"
+ "go.uber.org/atomic"
+
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/bufferpool"
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/logger"
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/syncx"
- "github.com/panjf2000/gnet/v2"
- "go.uber.org/atomic"
)
const (
@@ -114,6 +115,7 @@ type worker struct {
pendingBatches map[string]*batchReq // pending batches
unackedBatches map[string]*batchReq // sent but not
acknowledged batches
sendFailedBatches chan *sendFailedBatchReq // send failed batches
channel
+ updateConnChan chan error // update conn channel
retryBatches chan *batchReq // retry batches channel
responseBatches chan *batchRsp // batch response channel
batchTimeoutTicker *time.Ticker // batch timeout ticker
@@ -145,6 +147,7 @@ func newWorker(cli *client, index int, opts *Options)
(*worker, error) {
pendingBatches: make(map[string]*batchReq),
unackedBatches: make(map[string]*batchReq),
sendFailedBatches: make(chan *sendFailedBatchReq,
opts.MaxPendingMessages),
+ updateConnChan: make(chan error, 64),
retryBatches: make(chan *batchReq,
opts.MaxPendingMessages),
responseBatches: make(chan *batchRsp,
opts.MaxPendingMessages),
batchTimeoutTicker:
time.NewTicker(opts.BatchingMaxPublishDelay),
@@ -166,6 +169,7 @@ func newWorker(cli *client, index int, opts *Options)
(*worker, error) {
if err != nil {
return nil, err
}
+ w.log.Debug("use conn: ", conn.RemoteAddr().String())
w.setConn(conn)
// start the worker
@@ -184,7 +188,7 @@ func (w *worker) start() {
go func() {
defer func() {
if rec := recover(); rec != nil {
- w.log.Errorf("panic:", rec)
+ w.log.Error("panic:", rec)
w.log.Error(string(debug.Stack()))
w.metrics.incError(errServerPanic.getStrCode())
}
@@ -220,6 +224,12 @@ func (w *worker) start() {
case <-w.updateConnTicker.C:
// update connection periodically
w.handleUpdateConn()
+ case e, ok := <-w.updateConnChan:
+ if !ok {
+ continue
+ }
+ // update conn
+ w.updateConn(nil, e)
case batch, ok := <-w.sendFailedBatches:
// handle send failed batches
if !ok {
@@ -369,7 +379,7 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
b.lastSendTime = time.Now()
b.encode()
- //error callback
+ // error callback
onErr := func(c gnet.Conn, e error, inCallback bool) {
defer func() {
if rec := recover(); rec != nil {
@@ -380,7 +390,7 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
}()
w.metrics.incError(errConnWriteFailed.getStrCode())
- w.log.Error("send batch failed, err:", e)
+ w.log.Error("send batch failed, err: ", e, ", inCallback: ",
inCallback, ", logNum:", len(b.dataReqs))
// close already
if w.getState() == stateClosed {
@@ -388,19 +398,21 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool)
{
return
}
- // network error, change a new connection
- w.updateConn(c, errConnWriteFailed)
-
// important:when AsyncWrite() call succeed, the batch will be
put into w.unackedBatches,now it failed, we need
// to delete from w.unackedBatches, as onErr() is call
concurrently in different goroutine, we can not delete it
// from this callback directly, or will be panic, so we put
into the w.sendFailedBatches channel, and it will be
// deleted and retried in handleSendFailed() one by one
if inCallback {
+ // can not call w.updateConn() in callback,
updateConn() may open new conn, which will call gent.Client.Dial()
+ // gent.Client.Dial() and this callback are run in a
same goroutine, it will be blocked
+ w.updateConnAsync(errConnWriteFailed)
w.sendFailedBatches <- &sendFailedBatchReq{batch: b,
retry: retryOnFail}
return
}
// in a same goroutine, retry it directly
+ // network error, change a new connection
+ w.updateConn(c, errConnWriteFailed)
if retryOnFail {
// w.retryBatches <- b
w.backoffRetry(context.Background(), b)
@@ -411,15 +423,18 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool)
{
// very important:'cause we use gnet, we must call AsyncWrite to send
data in goroutines that are different from gnet.OnTraffic() callback
conn := w.getConn()
+ if b.retries > 0 {
+ w.log.Debug("retry batch to conn:", conn.RemoteAddr(), ",
workerID:", w.index, ", batchID:", b.batchID, ", logNum:", len(b.dataReqs))
+ }
err := conn.AsyncWrite(b.buffer.Bytes(), func(c gnet.Conn, e error)
error {
if e != nil {
- onErr(c, e, true) //error callback
+ onErr(c, e, true) // error callback
}
return nil
})
if err != nil {
- onErr(conn, err, false) //error callback
+ onErr(conn, err, false) // error callback
return
}
@@ -485,6 +500,7 @@ func (w *worker) backoffRetry(ctx context.Context, batch
*batchReq) {
}
// put the batch into the retry channel
+ w.log.Debug("put to retry...")
w.retryBatches <- batch
case <-ctx.Done():
// in the case the process exit, just end up the batch
sending routine
@@ -501,6 +517,7 @@ func (w *worker) handleRetry(batch *batchReq, retryOnFail
bool) {
}
// retry
+ w.log.Debug("retry batch...", ", workerID:", w.index, ", batchID:",
batch.batchID)
w.metrics.incRetry(w.indexStr)
w.sendBatch(batch, retryOnFail)
}
@@ -553,17 +570,23 @@ func (w *worker) handleSendHeartbeat() {
bb := w.bufferPool.Get()
bytes := hb.encode(bb)
- onErr := func(c gnet.Conn, e error) {
+ onErr := func(c gnet.Conn, e error, inCallback bool) {
w.metrics.incError(errConnWriteFailed.getStrCode())
w.log.Error("send heartbeat failed, err:", e)
- w.updateConn(c, errConnWriteFailed)
+ if inCallback {
+ // can not call w.updateConn() in callback,
updateConn() may open new conn, which will call gent.Client.Dial()
+ // gent.Client.Dial() and this callback are run in a
same goroutine, it will be blocked
+ w.updateConnAsync(errConnWriteFailed)
+ } else {
+ w.updateConn(c, errConnWriteFailed)
+ }
}
// very important:'cause we use gnet, we must call AsyncWrite to send
data in goroutines that are different from gnet.OnTraffic() callback
conn := w.getConn()
err := conn.AsyncWrite(bytes, func(c gnet.Conn, e error) error {
if e != nil {
- onErr(c, e)
+ onErr(c, e, true)
}
// recycle the buffer
w.bufferPool.Put(bb)
@@ -571,7 +594,7 @@ func (w *worker) handleSendHeartbeat() {
})
if err != nil {
- onErr(conn, err)
+ onErr(conn, err, false)
// recycle the buffer
w.bufferPool.Put(bb)
}
@@ -681,7 +704,9 @@ func (w *worker) handleClose(req *closeReq) {
close(w.retryBatches)
// close the send failed channel
close(w.sendFailedBatches)
- // close the response channel
+ // close the update conn chan
+ close(w.updateConnChan)
+ // close the response chan
close(w.responseBatches)
// close the done channel of the close request to notify the
close is done
close(req.doneCh)
@@ -713,6 +738,18 @@ func (w *worker) handleUpdateConn() {
w.updateConn(nil, nil)
}
+func (w *worker) updateConnAsync(err error) {
+ // 已经处于关闭状态
+ if w.getState() == stateClosed {
+ return
+ }
+
+ select {
+ case w.updateConnChan <- err:
+ default:
+ }
+}
+
func (w *worker) updateConn(old gnet.Conn, err error) {
newConn, newErr := w.client.getConn()
if newErr != nil {
@@ -726,9 +763,16 @@ func (w *worker) updateConn(old gnet.Conn, err error) {
oldConn = w.getConn()
}
- w.client.putConn(oldConn, err)
ok := w.casConn(oldConn, newConn)
if ok {
+ // put back to pool only if there is no error
+ if err == nil {
+ w.client.putConn(oldConn, err)
+ } else { // nolint:staticcheck
+ // if there are some errors, there are basically conn
closed by peer,
+ // gnet will call Client.OnClose() to delete it from
the pool,
+ // it won't be wrong even though we do not put it back
here
+ }
w.metrics.incUpdateConn(getErrorCode(err))
} else {
w.client.putConn(newConn, nil)
@@ -743,6 +787,13 @@ func (w *worker) getConn() gnet.Conn {
return w.conn.Load().(gnet.Conn)
}
+func (w *worker) onConnClosed(conn gnet.Conn, err error) {
+ oldConn := w.conn.Load().(gnet.Conn)
+ if oldConn == conn {
+ w.updateConnAsync(err)
+ }
+}
+
func (w *worker) casConn(oldConn, newConn gnet.Conn) bool {
return w.conn.CompareAndSwap(oldConn, newConn)
}