This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ed9009a640 [INLONG-8639][SDK] Improve send failed logic (#8640)
ed9009a640 is described below

commit ed9009a640319fbb2929f80ca022fbf4e4463a9a
Author: gunli <[email protected]>
AuthorDate: Sun Aug 6 15:54:43 2023 +0800

    [INLONG-8639][SDK] Improve send failed logic (#8640)
    
    Co-authored-by: gunli <[email protected]>
---
 .../dataproxy-sdk-golang/dataproxy/request.go      |   5 +
 .../dataproxy-sdk-golang/dataproxy/worker.go       | 115 ++++++++++++---------
 2 files changed, 73 insertions(+), 47 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
index f45204c809..ac89aba142 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
@@ -334,6 +334,11 @@ type closeReq struct {
        doneCh chan struct{}
 }
 
+type sendFailedBatchReq struct {
+       batch *batchReq
+       retry bool
+}
+
 func getWorkerIndex(workerID string) int {
        if workerID == "" {
                return -1
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
index 0734c06c56..186a8e3be7 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
@@ -58,7 +58,7 @@ var (
        errNewConnFailed    = &errNo{code: 10006, strCode: "10006", message: 
"new conn failed"}
        errConnWriteFailed  = &errNo{code: 10007, strCode: "10007", message: 
"conn write failed"}
        errConnReadFailed   = &errNo{code: 10008, strCode: "10008", message: 
"conn read failed"}
-       errLogToLong        = &errNo{code: 10009, strCode: "10009", message: 
"input log is too long"} //nolint:unused
+       errLogTooLong       = &errNo{code: 10009, strCode: "10009", message: 
"input log is too long"} //nolint:unused
        errBadLog           = &errNo{code: 10010, strCode: "10010", message: 
"input log is invalid"}
        errServerError      = &errNo{code: 10011, strCode: "10011", message: 
"server error"}
        errServerPanic      = &errNo{code: 10012, strCode: "10012", message: 
"server panic"}
@@ -101,31 +101,31 @@ func getErrorCode(err error) string {
 }
 
 type worker struct {
-       client             *client               // parent client
-       index              int                   // worker id
-       indexStr           string                // worker id string
-       options            *Options              // config options
-       state              atomic.Int32          // worker state
-       log                logger.Logger         // debug logger
-       conn               atomic.Value          // connection used to send data
-       cmdChan            chan interface{}      // command channel
-       dataChan           chan *sendDataReq     // data channel
-       dataSemaphore      syncx.Semaphore       // semaphore used to handle 
message queueing
-       pendingBatches     map[string]*batchReq  // pending batches
-       unackedBatches     map[string]*batchReq  // sent but not acknowledged 
batches
-       sendFailedBatches  chan *batchReq        // send failed batches channel
-       retryBatches       chan *batchReq        // retry batches  channel
-       responseBatches    chan batchRsp         // batch response channel
-       batchTimeoutTicker *time.Ticker          // batch timeout ticker
-       sendTimeoutTicker  *time.Ticker          // send timeout ticker
-       heartbeatTicker    *time.Ticker          // heartbeat ticker
-       mapCleanTicker     *time.Ticker          // map clean ticker, clean the 
unackedBatches map periodically
-       updateConnTicker   *time.Ticker          // update connection ticker, 
change connection periodically
-       unackedBatchCount  int                   // sent but not acknowledged 
batches counter, used to clean the unackedBatches map periodically
-       metrics            *metrics              // metrics
-       bufferPool         bufferpool.BufferPool // buffer pool
-       bytePool           bufferpool.BytePool   // byte pool
-       stop               bool                  // stop the worker
+       client             *client                 // parent client
+       index              int                     // worker id
+       indexStr           string                  // worker id string
+       options            *Options                // config options
+       state              atomic.Int32            // worker state
+       log                logger.Logger           // debug logger
+       conn               atomic.Value            // connection used to send 
data
+       cmdChan            chan interface{}        // command channel
+       dataChan           chan *sendDataReq       // data channel
+       dataSemaphore      syncx.Semaphore         // semaphore used to handle 
message queueing
+       pendingBatches     map[string]*batchReq    // pending batches
+       unackedBatches     map[string]*batchReq    // sent but not acknowledged 
batches
+       sendFailedBatches  chan sendFailedBatchReq // send failed batches 
channel
+       retryBatches       chan *batchReq          // retry batches  channel
+       responseBatches    chan batchRsp           // batch response channel
+       batchTimeoutTicker *time.Ticker            // batch timeout ticker
+       sendTimeoutTicker  *time.Ticker            // send timeout ticker
+       heartbeatTicker    *time.Ticker            // heartbeat ticker
+       mapCleanTicker     *time.Ticker            // map clean ticker, clean 
the unackedBatches map periodically
+       updateConnTicker   *time.Ticker            // update connection ticker, 
change connection periodically
+       unackedBatchCount  int                     // sent but not acknowledged 
batches counter, used to clean the unackedBatches map periodically
+       metrics            *metrics                // metrics
+       bufferPool         bufferpool.BufferPool   // buffer pool
+       bytePool           bufferpool.BytePool     // byte pool
+       stop               bool                    // stop the worker
 }
 
 func newWorker(cli *client, index int, opts *Options) (*worker, error) {
@@ -144,7 +144,7 @@ func newWorker(cli *client, index int, opts *Options) 
(*worker, error) {
                dataSemaphore:      
syncx.NewSemaphore(int32(opts.MaxPendingMessages)),
                pendingBatches:     make(map[string]*batchReq),
                unackedBatches:     make(map[string]*batchReq),
-               sendFailedBatches:  make(chan *batchReq, 
opts.MaxPendingMessages),
+               sendFailedBatches:  make(chan sendFailedBatchReq, 
opts.MaxPendingMessages),
                retryBatches:       make(chan *batchReq, 
opts.MaxPendingMessages),
                responseBatches:    make(chan batchRsp, 
opts.MaxPendingMessages),
                batchTimeoutTicker: 
time.NewTicker(opts.BatchingMaxPublishDelay),
@@ -353,7 +353,7 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
        b.encode()
 
        //error callback
-       onErr := func(e error) {
+       onErr := func(c gnet.Conn, e error, inCallback bool) {
                defer func() {
                        if rec := recover(); rec != nil {
                                w.log.Error("panic:", rec)
@@ -371,15 +371,19 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) 
{
                        return
                }
 
+               // network error, change a new connection
+               w.updateConn(c, errConnWriteFailed)
+
                // important:when AsyncWrite() call succeed, the batch will be 
put into w.unackedBatches,now it failed, we need
                // to delete from w.unackedBatches, as onErr() is call 
concurrently in different goroutine, we can not delete it
                // from this callback directly, or will be panic, so we put 
into the w.sendFailedBatches channel, and it will be
-               // deleted in handleSendFailed() one by one
-               w.sendFailedBatches <- b
+               // deleted and retried in handleSendFailed() one by one
+               if inCallback {
+                       w.sendFailedBatches <- sendFailedBatchReq{batch: b, 
retry: retryOnFail}
+                       return
+               }
 
-               // network error, change a new connection
-               w.updateConn(errConnWriteFailed)
-               // put the batch to the retry channel
+               // in a same goroutine, retry it directly
                if retryOnFail {
                        // w.retryBatches <- b
                        w.backoffRetry(context.Background(), b)
@@ -393,13 +397,13 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) 
{
        conn := w.getConn()
        err := conn.AsyncWrite(b.buffer.Bytes(), func(c gnet.Conn, e error) 
error {
                if e != nil {
-                       onErr(e) //error callback
+                       onErr(c, e, true) //error callback
                }
                return nil
        })
 
        if err != nil {
-               onErr(err) //error callback
+               onErr(conn, err, false) //error callback
                return
        }
 
@@ -411,9 +415,14 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
        w.unackedBatches[b.batchID] = b
 }
 
-func (w *worker) handleSendFailed(b *batchReq) {
-       // send failed, delete the batch from unackedBatches
-       delete(w.unackedBatches, b.batchID)
+func (w *worker) handleSendFailed(b sendFailedBatchReq) {
+       // send failed, delete the batch from unackedBatches, when retried, it 
will be pushed back
+       delete(w.unackedBatches, b.batch.batchID)
+       if b.retry {
+               w.backoffRetry(context.Background(), b.batch)
+       } else {
+               b.batch.done(errConnWriteFailed)
+       }
 }
 
 func (w *worker) backoffRetry(ctx context.Context, batch *batchReq) {
@@ -532,17 +541,17 @@ func (w *worker) handleSendHeartbeat() {
        bb := w.bufferPool.Get()
        bytes := hb.encode(bb)
 
-       onErr := func(e error) {
+       onErr := func(c gnet.Conn, e error) {
                w.metrics.incError(errConnWriteFailed.getStrCode())
                w.log.Error("send heartbeat failed, err:", e)
-               w.updateConn(errConnWriteFailed)
+               w.updateConn(c, errConnWriteFailed)
        }
 
        // very important:'cause we use gnet, we must call AsyncWrite to send 
data in goroutines that are different from gnet.OnTraffic() callback
        conn := w.getConn()
        err := conn.AsyncWrite(bytes, func(c gnet.Conn, e error) error {
                if e != nil {
-                       onErr(e)
+                       onErr(c, e)
                }
                // recycle the buffer
                w.bufferPool.Put(bb)
@@ -550,7 +559,7 @@ func (w *worker) handleSendHeartbeat() {
        })
 
        if err != nil {
-               onErr(err)
+               onErr(conn, err)
                // recycle the buffer
                w.bufferPool.Put(bb)
        }
@@ -695,10 +704,10 @@ func (w *worker) handleClose(req *closeReq) {
 }
 
 func (w *worker) handleUpdateConn() {
-       w.updateConn(nil)
+       w.updateConn(nil, nil)
 }
 
-func (w *worker) updateConn(err error) {
+func (w *worker) updateConn(old gnet.Conn, err error) {
        w.log.Debug("worker[", w.index, "] updateConn")
        newConn, newErr := w.client.getConn()
        if newErr != nil {
@@ -707,10 +716,18 @@ func (w *worker) updateConn(err error) {
                return
        }
 
-       oldConn := w.getConn()
+       oldConn := old
+       if oldConn == nil {
+               oldConn = w.getConn()
+       }
+
        w.client.putConn(oldConn, err)
-       w.setConn(newConn)
-       w.metrics.incUpdateConn(getErrorCode(err))
+       ok := w.casConn(oldConn, newConn)
+       if ok {
+               w.metrics.incUpdateConn(getErrorCode(err))
+       } else {
+               w.client.putConn(newConn, nil)
+       }
 }
 
 func (w *worker) setConn(conn gnet.Conn) {
@@ -721,6 +738,10 @@ func (w *worker) getConn() gnet.Conn {
        return w.conn.Load().(gnet.Conn)
 }
 
+func (w *worker) casConn(oldConn, newConn gnet.Conn) bool {
+       return w.conn.CompareAndSwap(oldConn, newConn)
+}
+
 func (w *worker) setState(state workerState) {
        w.state.Swap(int32(state))
 }

Reply via email to