This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ed9009a640 [INLONG-8639][SDK] Improve send failed logic (#8640)
ed9009a640 is described below
commit ed9009a640319fbb2929f80ca022fbf4e4463a9a
Author: gunli <[email protected]>
AuthorDate: Sun Aug 6 15:54:43 2023 +0800
[INLONG-8639][SDK] Improve send failed logic (#8640)
Co-authored-by: gunli <[email protected]>
---
.../dataproxy-sdk-golang/dataproxy/request.go | 5 +
.../dataproxy-sdk-golang/dataproxy/worker.go | 115 ++++++++++++---------
2 files changed, 73 insertions(+), 47 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
index f45204c809..ac89aba142 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/request.go
@@ -334,6 +334,11 @@ type closeReq struct {
doneCh chan struct{}
}
+type sendFailedBatchReq struct {
+ batch *batchReq
+ retry bool
+}
+
func getWorkerIndex(workerID string) int {
if workerID == "" {
return -1
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
index 0734c06c56..186a8e3be7 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
@@ -58,7 +58,7 @@ var (
errNewConnFailed = &errNo{code: 10006, strCode: "10006", message:
"new conn failed"}
errConnWriteFailed = &errNo{code: 10007, strCode: "10007", message:
"conn write failed"}
errConnReadFailed = &errNo{code: 10008, strCode: "10008", message:
"conn read failed"}
- errLogToLong = &errNo{code: 10009, strCode: "10009", message:
"input log is too long"} //nolint:unused
+ errLogTooLong = &errNo{code: 10009, strCode: "10009", message:
"input log is too long"} //nolint:unused
errBadLog = &errNo{code: 10010, strCode: "10010", message:
"input log is invalid"}
errServerError = &errNo{code: 10011, strCode: "10011", message:
"server error"}
errServerPanic = &errNo{code: 10012, strCode: "10012", message:
"server panic"}
@@ -101,31 +101,31 @@ func getErrorCode(err error) string {
}
type worker struct {
- client *client // parent client
- index int // worker id
- indexStr string // worker id string
- options *Options // config options
- state atomic.Int32 // worker state
- log logger.Logger // debug logger
- conn atomic.Value // connection used to send data
- cmdChan chan interface{} // command channel
- dataChan chan *sendDataReq // data channel
- dataSemaphore syncx.Semaphore // semaphore used to handle
message queueing
- pendingBatches map[string]*batchReq // pending batches
- unackedBatches map[string]*batchReq // sent but not acknowledged
batches
- sendFailedBatches chan *batchReq // send failed batches channel
- retryBatches chan *batchReq // retry batches channel
- responseBatches chan batchRsp // batch response channel
- batchTimeoutTicker *time.Ticker // batch timeout ticker
- sendTimeoutTicker *time.Ticker // send timeout ticker
- heartbeatTicker *time.Ticker // heartbeat ticker
- mapCleanTicker *time.Ticker // map clean ticker, clean the
unackedBatches map periodically
- updateConnTicker *time.Ticker // update connection ticker,
change connection periodically
- unackedBatchCount int // sent but not acknowledged
batches counter, used to clean the unackedBatches map periodically
- metrics *metrics // metrics
- bufferPool bufferpool.BufferPool // buffer pool
- bytePool bufferpool.BytePool // byte pool
- stop bool // stop the worker
+ client *client // parent client
+ index int // worker id
+ indexStr string // worker id string
+ options *Options // config options
+ state atomic.Int32 // worker state
+ log logger.Logger // debug logger
+ conn atomic.Value // connection used to send
data
+ cmdChan chan interface{} // command channel
+ dataChan chan *sendDataReq // data channel
+ dataSemaphore syncx.Semaphore // semaphore used to handle
message queueing
+ pendingBatches map[string]*batchReq // pending batches
+ unackedBatches map[string]*batchReq // sent but not acknowledged
batches
+ sendFailedBatches chan sendFailedBatchReq // send failed batches
channel
+ retryBatches chan *batchReq // retry batches channel
+ responseBatches chan batchRsp // batch response channel
+ batchTimeoutTicker *time.Ticker // batch timeout ticker
+ sendTimeoutTicker *time.Ticker // send timeout ticker
+ heartbeatTicker *time.Ticker // heartbeat ticker
+ mapCleanTicker *time.Ticker // map clean ticker, clean
the unackedBatches map periodically
+ updateConnTicker *time.Ticker // update connection ticker,
change connection periodically
+ unackedBatchCount int // sent but not acknowledged
batches counter, used to clean the unackedBatches map periodically
+ metrics *metrics // metrics
+ bufferPool bufferpool.BufferPool // buffer pool
+ bytePool bufferpool.BytePool // byte pool
+ stop bool // stop the worker
}
func newWorker(cli *client, index int, opts *Options) (*worker, error) {
@@ -144,7 +144,7 @@ func newWorker(cli *client, index int, opts *Options)
(*worker, error) {
dataSemaphore:
syncx.NewSemaphore(int32(opts.MaxPendingMessages)),
pendingBatches: make(map[string]*batchReq),
unackedBatches: make(map[string]*batchReq),
- sendFailedBatches: make(chan *batchReq,
opts.MaxPendingMessages),
+ sendFailedBatches: make(chan sendFailedBatchReq,
opts.MaxPendingMessages),
retryBatches: make(chan *batchReq,
opts.MaxPendingMessages),
responseBatches: make(chan batchRsp,
opts.MaxPendingMessages),
batchTimeoutTicker:
time.NewTicker(opts.BatchingMaxPublishDelay),
@@ -353,7 +353,7 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
b.encode()
//error callback
- onErr := func(e error) {
+ onErr := func(c gnet.Conn, e error, inCallback bool) {
defer func() {
if rec := recover(); rec != nil {
w.log.Error("panic:", rec)
@@ -371,15 +371,19 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool)
{
return
}
+ // network error, change a new connection
+ w.updateConn(c, errConnWriteFailed)
+
// important:when AsyncWrite() call succeed, the batch will be
put into w.unackedBatches,now it failed, we need
// to delete from w.unackedBatches, as onErr() is call
concurrently in different goroutine, we can not delete it
// from this callback directly, or will be panic, so we put
into the w.sendFailedBatches channel, and it will be
- // deleted in handleSendFailed() one by one
- w.sendFailedBatches <- b
+ // deleted and retried in handleSendFailed() one by one
+ if inCallback {
+ w.sendFailedBatches <- sendFailedBatchReq{batch: b,
retry: retryOnFail}
+ return
+ }
- // network error, change a new connection
- w.updateConn(errConnWriteFailed)
- // put the batch to the retry channel
+ // in a same goroutine, retry it directly
if retryOnFail {
// w.retryBatches <- b
w.backoffRetry(context.Background(), b)
@@ -393,13 +397,13 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool)
{
conn := w.getConn()
err := conn.AsyncWrite(b.buffer.Bytes(), func(c gnet.Conn, e error)
error {
if e != nil {
- onErr(e) //error callback
+ onErr(c, e, true) //error callback
}
return nil
})
if err != nil {
- onErr(err) //error callback
+ onErr(conn, err, false) //error callback
return
}
@@ -411,9 +415,14 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
w.unackedBatches[b.batchID] = b
}
-func (w *worker) handleSendFailed(b *batchReq) {
- // send failed, delete the batch from unackedBatches
- delete(w.unackedBatches, b.batchID)
+func (w *worker) handleSendFailed(b sendFailedBatchReq) {
+ // send failed, delete the batch from unackedBatches, when retried, it
will be pushed back
+ delete(w.unackedBatches, b.batch.batchID)
+ if b.retry {
+ w.backoffRetry(context.Background(), b.batch)
+ } else {
+ b.batch.done(errConnWriteFailed)
+ }
}
func (w *worker) backoffRetry(ctx context.Context, batch *batchReq) {
@@ -532,17 +541,17 @@ func (w *worker) handleSendHeartbeat() {
bb := w.bufferPool.Get()
bytes := hb.encode(bb)
- onErr := func(e error) {
+ onErr := func(c gnet.Conn, e error) {
w.metrics.incError(errConnWriteFailed.getStrCode())
w.log.Error("send heartbeat failed, err:", e)
- w.updateConn(errConnWriteFailed)
+ w.updateConn(c, errConnWriteFailed)
}
// very important:'cause we use gnet, we must call AsyncWrite to send
data in goroutines that are different from gnet.OnTraffic() callback
conn := w.getConn()
err := conn.AsyncWrite(bytes, func(c gnet.Conn, e error) error {
if e != nil {
- onErr(e)
+ onErr(c, e)
}
// recycle the buffer
w.bufferPool.Put(bb)
@@ -550,7 +559,7 @@ func (w *worker) handleSendHeartbeat() {
})
if err != nil {
- onErr(err)
+ onErr(conn, err)
// recycle the buffer
w.bufferPool.Put(bb)
}
@@ -695,10 +704,10 @@ func (w *worker) handleClose(req *closeReq) {
}
func (w *worker) handleUpdateConn() {
- w.updateConn(nil)
+ w.updateConn(nil, nil)
}
-func (w *worker) updateConn(err error) {
+func (w *worker) updateConn(old gnet.Conn, err error) {
w.log.Debug("worker[", w.index, "] updateConn")
newConn, newErr := w.client.getConn()
if newErr != nil {
@@ -707,10 +716,18 @@ func (w *worker) updateConn(err error) {
return
}
- oldConn := w.getConn()
+ oldConn := old
+ if oldConn == nil {
+ oldConn = w.getConn()
+ }
+
w.client.putConn(oldConn, err)
- w.setConn(newConn)
- w.metrics.incUpdateConn(getErrorCode(err))
+ ok := w.casConn(oldConn, newConn)
+ if ok {
+ w.metrics.incUpdateConn(getErrorCode(err))
+ } else {
+ w.client.putConn(newConn, nil)
+ }
}
func (w *worker) setConn(conn gnet.Conn) {
@@ -721,6 +738,10 @@ func (w *worker) getConn() gnet.Conn {
return w.conn.Load().(gnet.Conn)
}
+func (w *worker) casConn(oldConn, newConn gnet.Conn) bool {
+ return w.conn.CompareAndSwap(oldConn, newConn)
+}
+
func (w *worker) setState(state workerState) {
w.state.Swap(int32(state))
}