This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 542b2e8684 [INLONG-16070][SDK] Fix potential block in Golang SDK 
(#10674)
542b2e8684 is described below

commit 542b2e8684e8767c336579020f0e05cfc9e27ab0
Author: gunli <[email protected]>
AuthorDate: Fri Jul 19 14:08:33 2024 +0800

    [INLONG-16070][SDK] Fix potential block in Golang SDK (#10674)
    
    Co-authored-by: gunli <[email protected]>
---
 .../dataproxy-sdk-golang/dataproxy/client.go       | 28 +++++---
 .../dataproxy-sdk-golang/dataproxy/worker.go       | 83 +++++++++++++++++-----
 2 files changed, 86 insertions(+), 25 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
index b6f3a0f2af..cce1e4e4ab 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
@@ -91,23 +91,23 @@ func NewClient(opts ...Option) (Client, error) {
 
 func (c *client) initAll() error {
        // the following initialization order must not be changed。
-       err := c.initDiscoverer()
+       err := c.initMetrics()
        if err != nil {
                return err
        }
-       err = c.initNetClient()
+       err = c.initDiscoverer()
        if err != nil {
                return err
        }
-       err = c.initConns()
+       err = c.initNetClient()
        if err != nil {
                return err
        }
-       err = c.initFramer()
+       err = c.initConns()
        if err != nil {
                return err
        }
-       err = c.initMetrics()
+       err = c.initFramer()
        if err != nil {
                return err
        }
@@ -162,10 +162,8 @@ func (c *client) initConns() error {
                endpoints[i] = epList[i].Addr
        }
 
-       // maximum connection number per endpoint is 3
-       connsPerEndpoint := c.options.WorkerNum/epLen + 1
-       connsPerEndpoint = int(math.Min(3, float64(connsPerEndpoint)))
-
+       // minimum connection number per endpoint is 1
+       connsPerEndpoint := int(math.Ceil(float64(c.options.WorkerNum) * 1.2 / 
float64(epLen)))
        pool, err := connpool.NewConnPool(endpoints, connsPerEndpoint, 512, c, 
c.log)
        if err != nil {
                return err
@@ -296,6 +294,18 @@ func (c *client) OnClose(conn gnet.Conn, err error) 
gnet.Action {
                c.log.Warn("connection closed: ", conn.RemoteAddr(), ", err: ", 
err)
                c.metrics.incError(errConnClosedByPeer.strCode)
        }
+
+       // delete this conn from conn pool
+       if c.connPool != nil {
+               c.connPool.OnConnClosed(conn, err)
+       }
+
+       if err != nil {
+               for _, w := range c.workers {
+                       w.onConnClosed(conn, err)
+               }
+       }
+
        return gnet.None
 }
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
index 2991364b82..935c69732c 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
@@ -25,11 +25,12 @@ import (
 
        "github.com/gofrs/uuid"
 
+       "github.com/panjf2000/gnet/v2"
+       "go.uber.org/atomic"
+
        
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/bufferpool"
        
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/logger"
        
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/syncx"
-       "github.com/panjf2000/gnet/v2"
-       "go.uber.org/atomic"
 )
 
 const (
@@ -114,6 +115,7 @@ type worker struct {
        pendingBatches     map[string]*batchReq     // pending batches
        unackedBatches     map[string]*batchReq     // sent but not 
acknowledged batches
        sendFailedBatches  chan *sendFailedBatchReq // send failed batches 
channel
+       updateConnChan     chan error               // update conn channel
        retryBatches       chan *batchReq           // retry batches  channel
        responseBatches    chan *batchRsp           // batch response channel
        batchTimeoutTicker *time.Ticker             // batch timeout ticker
@@ -145,6 +147,7 @@ func newWorker(cli *client, index int, opts *Options) 
(*worker, error) {
                pendingBatches:     make(map[string]*batchReq),
                unackedBatches:     make(map[string]*batchReq),
                sendFailedBatches:  make(chan *sendFailedBatchReq, 
opts.MaxPendingMessages),
+               updateConnChan:     make(chan error, 64),
                retryBatches:       make(chan *batchReq, 
opts.MaxPendingMessages),
                responseBatches:    make(chan *batchRsp, 
opts.MaxPendingMessages),
                batchTimeoutTicker: 
time.NewTicker(opts.BatchingMaxPublishDelay),
@@ -166,6 +169,7 @@ func newWorker(cli *client, index int, opts *Options) 
(*worker, error) {
        if err != nil {
                return nil, err
        }
+       w.log.Debug("use conn: ", conn.RemoteAddr().String())
        w.setConn(conn)
 
        // start the worker
@@ -184,7 +188,7 @@ func (w *worker) start() {
        go func() {
                defer func() {
                        if rec := recover(); rec != nil {
-                               w.log.Errorf("panic:", rec)
+                               w.log.Error("panic:", rec)
                                w.log.Error(string(debug.Stack()))
                                w.metrics.incError(errServerPanic.getStrCode())
                        }
@@ -220,6 +224,12 @@ func (w *worker) start() {
                        case <-w.updateConnTicker.C:
                                // update connection periodically
                                w.handleUpdateConn()
+                       case e, ok := <-w.updateConnChan:
+                               if !ok {
+                                       continue
+                               }
+                               // update conn
+                               w.updateConn(nil, e)
                        case batch, ok := <-w.sendFailedBatches:
                                // handle send failed batches
                                if !ok {
@@ -369,7 +379,7 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
        b.lastSendTime = time.Now()
        b.encode()
 
-       //error callback
+       // error callback
        onErr := func(c gnet.Conn, e error, inCallback bool) {
                defer func() {
                        if rec := recover(); rec != nil {
@@ -380,7 +390,7 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
                }()
 
                w.metrics.incError(errConnWriteFailed.getStrCode())
-               w.log.Error("send batch failed, err:", e)
+               w.log.Error("send batch failed, err: ", e, ", inCallback: ", 
inCallback, ", logNum:", len(b.dataReqs))
 
                // close already
                if w.getState() == stateClosed {
@@ -388,19 +398,21 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) 
{
                        return
                }
 
-               // network error, change a new connection
-               w.updateConn(c, errConnWriteFailed)
-
                // important:when AsyncWrite() call succeed, the batch will be 
put into w.unackedBatches,now it failed, we need
                // to delete from w.unackedBatches, as onErr() is call 
concurrently in different goroutine, we can not delete it
                // from this callback directly, or will be panic, so we put 
into the w.sendFailedBatches channel, and it will be
                // deleted and retried in handleSendFailed() one by one
                if inCallback {
+                       // can not call w.updateConn() in callback, 
updateConn() may open new conn, which will call gent.Client.Dial()
+                       // gent.Client.Dial() and this callback are run in a 
same goroutine, it will be blocked
+                       w.updateConnAsync(errConnWriteFailed)
                        w.sendFailedBatches <- &sendFailedBatchReq{batch: b, 
retry: retryOnFail}
                        return
                }
 
                // in a same goroutine, retry it directly
+               // network error, change a new connection
+               w.updateConn(c, errConnWriteFailed)
                if retryOnFail {
                        // w.retryBatches <- b
                        w.backoffRetry(context.Background(), b)
@@ -411,15 +423,18 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) 
{
 
        // very important:'cause we use gnet, we must call AsyncWrite to send 
data in goroutines that are different from gnet.OnTraffic() callback
        conn := w.getConn()
+       if b.retries > 0 {
+               w.log.Debug("retry batch to conn:", conn.RemoteAddr(), ", 
workerID:", w.index, ", batchID:", b.batchID, ", logNum:", len(b.dataReqs))
+       }
        err := conn.AsyncWrite(b.buffer.Bytes(), func(c gnet.Conn, e error) 
error {
                if e != nil {
-                       onErr(c, e, true) //error callback
+                       onErr(c, e, true) // error callback
                }
                return nil
        })
 
        if err != nil {
-               onErr(conn, err, false) //error callback
+               onErr(conn, err, false) // error callback
                return
        }
 
@@ -485,6 +500,7 @@ func (w *worker) backoffRetry(ctx context.Context, batch 
*batchReq) {
                        }
 
                        // put the batch into the retry channel
+                       w.log.Debug("put to retry...")
                        w.retryBatches <- batch
                case <-ctx.Done():
                        // in the case the process exit, just end up the batch 
sending routine
@@ -501,6 +517,7 @@ func (w *worker) handleRetry(batch *batchReq, retryOnFail 
bool) {
        }
 
        // retry
+       w.log.Debug("retry batch...", ", workerID:", w.index, ", batchID:", 
batch.batchID)
        w.metrics.incRetry(w.indexStr)
        w.sendBatch(batch, retryOnFail)
 }
@@ -553,17 +570,23 @@ func (w *worker) handleSendHeartbeat() {
        bb := w.bufferPool.Get()
        bytes := hb.encode(bb)
 
-       onErr := func(c gnet.Conn, e error) {
+       onErr := func(c gnet.Conn, e error, inCallback bool) {
                w.metrics.incError(errConnWriteFailed.getStrCode())
                w.log.Error("send heartbeat failed, err:", e)
-               w.updateConn(c, errConnWriteFailed)
+               if inCallback {
+                       // can not call w.updateConn() in callback, 
updateConn() may open new conn, which will call gent.Client.Dial()
+                       // gent.Client.Dial() and this callback are run in a 
same goroutine, it will be blocked
+                       w.updateConnAsync(errConnWriteFailed)
+               } else {
+                       w.updateConn(c, errConnWriteFailed)
+               }
        }
 
        // very important:'cause we use gnet, we must call AsyncWrite to send 
data in goroutines that are different from gnet.OnTraffic() callback
        conn := w.getConn()
        err := conn.AsyncWrite(bytes, func(c gnet.Conn, e error) error {
                if e != nil {
-                       onErr(c, e)
+                       onErr(c, e, true)
                }
                // recycle the buffer
                w.bufferPool.Put(bb)
@@ -571,7 +594,7 @@ func (w *worker) handleSendHeartbeat() {
        })
 
        if err != nil {
-               onErr(conn, err)
+               onErr(conn, err, false)
                // recycle the buffer
                w.bufferPool.Put(bb)
        }
@@ -681,7 +704,9 @@ func (w *worker) handleClose(req *closeReq) {
                close(w.retryBatches)
                // close the send failed channel
                close(w.sendFailedBatches)
-               // close the response channel
+               // close the update conn chan
+               close(w.updateConnChan)
+               // close the response chan
                close(w.responseBatches)
                // close the done channel of the close request to notify the 
close is done
                close(req.doneCh)
@@ -713,6 +738,18 @@ func (w *worker) handleUpdateConn() {
        w.updateConn(nil, nil)
 }
 
+func (w *worker) updateConnAsync(err error) {
+       // 已经处于关闭状态
+       if w.getState() == stateClosed {
+               return
+       }
+
+       select {
+       case w.updateConnChan <- err:
+       default:
+       }
+}
+
 func (w *worker) updateConn(old gnet.Conn, err error) {
        newConn, newErr := w.client.getConn()
        if newErr != nil {
@@ -726,9 +763,16 @@ func (w *worker) updateConn(old gnet.Conn, err error) {
                oldConn = w.getConn()
        }
 
-       w.client.putConn(oldConn, err)
        ok := w.casConn(oldConn, newConn)
        if ok {
+               // put back to pool only if there is no error
+               if err == nil {
+                       w.client.putConn(oldConn, err)
+               } else { // nolint:staticcheck
+                       // if there are some errors, there are basically conn 
closed by peer,
+                       // gnet will call Client.OnClose() to delete it from 
the pool,
+                       // it won't be wrong even though we do not put it back 
here
+               }
                w.metrics.incUpdateConn(getErrorCode(err))
        } else {
                w.client.putConn(newConn, nil)
@@ -743,6 +787,13 @@ func (w *worker) getConn() gnet.Conn {
        return w.conn.Load().(gnet.Conn)
 }
 
+func (w *worker) onConnClosed(conn gnet.Conn, err error) {
+       oldConn := w.conn.Load().(gnet.Conn)
+       if oldConn == conn {
+               w.updateConnAsync(err)
+       }
+}
+
 func (w *worker) casConn(oldConn, newConn gnet.Conn) bool {
        return w.conn.CompareAndSwap(oldConn, newConn)
 }

Reply via email to