This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0edced5706 [INLONG-12058][SDK] Support retry mechanism when "server 
error" occurs in dataproxy client (#12059)
0edced5706 is described below

commit 0edced5706272a1b1d2bea627e74ec66a05938ec
Author: yfsn666 <[email protected]>
AuthorDate: Mon Jan 5 10:08:57 2026 +0800

    [INLONG-12058][SDK] Support retry mechanism when "server error" occurs in 
dataproxy client (#12059)
---
 .../dataproxy-sdk-golang/dataproxy/options.go      |  6 ++
 .../dataproxy/options_producer.go                  | 17 +++++
 .../dataproxy-sdk-golang/dataproxy/worker.go       | 87 ++++++++++++++++------
 3 files changed, 89 insertions(+), 21 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go
index 985802ed6d..200096d8ce 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go
@@ -68,6 +68,8 @@ type Options struct {
        WorkerNum               int                   // worker number, 
default: 8
        SendTimeout             time.Duration         // send timeout, default: 
30000ms
        MaxRetries              int                   // max retry count, 
default: 2
+       RetryOnServerError      bool                  // whether to retry on 
server error, default: false
+       RetryInitialInterval    time.Duration         // initial retry interval 
for exponential backoff, default: 100ms
        BatchingMaxPublishDelay time.Duration         // the time period within 
which the messages sent will be batched, default: 20ms
        BatchingMaxMessages     int                   // the maximum number of 
messages permitted in a batch, default: 50
        BatchingMaxSize         int                   // the maximum number of 
bytes permitted in a batch, default: 40K
@@ -159,6 +161,10 @@ func (options *Options) ValidateAndSetDefault() error {
                options.MaxRetries = 2
        }
 
+       if options.RetryInitialInterval <= 0 {
+               options.RetryInitialInterval = 100 * time.Millisecond
+       }
+
        if options.WriteBufferSize <= 0 {
                options.WriteBufferSize = 8 * 1024 * 1024
        }
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_producer.go
 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_producer.go
index 088a5263b7..a419d54528 100755
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_producer.go
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_producer.go
@@ -50,6 +50,23 @@ func WithMaxRetries(n int) Option {
        }
 }
 
+// WithRetryOnServerError sets RetryOnServerError
+func WithRetryOnServerError(retry bool) Option {
+       return func(o *Options) {
+               o.RetryOnServerError = retry
+       }
+}
+
+// WithRetryInitialInterval sets RetryInitialInterval
+func WithRetryInitialInterval(interval time.Duration) Option {
+       return func(o *Options) {
+               if interval <= 0 {
+                       return
+               }
+               o.RetryInitialInterval = interval
+       }
+}
+
 // WithBatchingMaxPublishDelay sets BatchingMaxPublishDelay
 func WithBatchingMaxPublishDelay(t time.Duration) Option {
        return func(o *Options) {
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
index 02827e522f..46c6cfde3c 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
@@ -41,6 +41,12 @@ const (
        defaultMapCleanThreshold = 500000
 )
 
+// retryableServerErrorCodes defines which server error codes should trigger 
retry
+// and whether connection switch is needed.
+var retryableServerErrorCodes = map[int]bool{
+       2: true, // SERVICE_CLOSED
+}
+
 type workerState int32
 
 const (
@@ -52,35 +58,50 @@ const (
 )
 
 var (
-       errOK               = &errNo{code: 0, strCode: "0", message: "OK"}
-       errSendTimeout      = &errNo{code: 10001, strCode: "10001", message: 
"message send timeout"}
-       errSendFailed       = &errNo{code: 10002, strCode: "10002", message: 
"message send failed"} //nolint:unused
-       errProducerClosed   = &errNo{code: 10003, strCode: "10003", message: 
"producer already been closed"}
-       errSendQueueIsFull  = &errNo{code: 10004, strCode: "10004", message: 
"producer send queue is full"}
-       errContextExpired   = &errNo{code: 10005, strCode: "10005", message: 
"message context expired"}
-       errNewConnFailed    = &errNo{code: 10006, strCode: "10006", message: 
"new conn failed"}
-       errConnWriteFailed  = &errNo{code: 10007, strCode: "10007", message: 
"conn write failed"}
-       errConnReadFailed   = &errNo{code: 10008, strCode: "10008", message: 
"conn read failed"}
-       errLogTooLong       = &errNo{code: 10009, strCode: "10009", message: 
"input log is too long"} //nolint:unused
-       errBadLog           = &errNo{code: 10010, strCode: "10010", message: 
"input log is invalid"}
-       errServerError      = &errNo{code: 10011, strCode: "10011", message: 
"server error"} //nolint:unused
-       errServerPanic      = &errNo{code: 10012, strCode: "10012", message: 
"server panic"}
-       workerBusy          = &errNo{code: 10013, strCode: "10013", message: 
"worker is busy"}
-       errNoMatchReq4Rsp   = &errNo{code: 10014, strCode: "10014", message: 
"no match unacknowledged request for response"}
-       errConnClosedByPeer = &errNo{code: 10015, strCode: "10015", message: 
"conn closed by peer"}
-       errUnknown          = &errNo{code: 20001, strCode: "20001", message: 
"unknown"}
+       errOK               = &errNo{code: 0, strCode: "0", message: "OK", 
serverErrCode: -1}
+       errSendTimeout      = &errNo{code: 10001, strCode: "10001", message: 
"message send timeout", serverErrCode: -1}
+       errSendFailed       = &errNo{code: 10002, strCode: "10002", message: 
"message send failed", serverErrCode: -1} //nolint:unused
+       errProducerClosed   = &errNo{code: 10003, strCode: "10003", message: 
"producer already been closed", serverErrCode: -1}
+       errSendQueueIsFull  = &errNo{code: 10004, strCode: "10004", message: 
"producer send queue is full", serverErrCode: -1}
+       errContextExpired   = &errNo{code: 10005, strCode: "10005", message: 
"message context expired", serverErrCode: -1}
+       errNewConnFailed    = &errNo{code: 10006, strCode: "10006", message: 
"new conn failed", serverErrCode: -1}
+       errConnWriteFailed  = &errNo{code: 10007, strCode: "10007", message: 
"conn write failed", serverErrCode: -1}
+       errConnReadFailed   = &errNo{code: 10008, strCode: "10008", message: 
"conn read failed", serverErrCode: -1}
+       errLogTooLong       = &errNo{code: 10009, strCode: "10009", message: 
"input log is too long", serverErrCode: -1} //nolint:unused
+       errBadLog           = &errNo{code: 10010, strCode: "10010", message: 
"input log is invalid", serverErrCode: -1}
+       errServerError      = &errNo{code: 10011, strCode: "10011", message: 
"server error", serverErrCode: -1} //nolint:unused
+       errServerPanic      = &errNo{code: 10012, strCode: "10012", message: 
"server panic", serverErrCode: -1}
+       workerBusy          = &errNo{code: 10013, strCode: "10013", message: 
"worker is busy", serverErrCode: -1}
+       errNoMatchReq4Rsp   = &errNo{code: 10014, strCode: "10014", message: 
"no match unacknowledged request for response", serverErrCode: -1}
+       errConnClosedByPeer = &errNo{code: 10015, strCode: "10015", message: 
"conn closed by peer", serverErrCode: -1}
+       errUnknown          = &errNo{code: 20001, strCode: "20001", message: 
"unknown", serverErrCode: -1}
 )
 
 type errNo struct {
-       code    int
-       strCode string
-       message string
+       code          int
+       strCode       string
+       message       string
+       serverErrCode int // server error code from server response, -1 means 
not a server error
 }
 
 func (e *errNo) Error() string {
        return e.message
 }
 
+// GetServerErrorCode extracts server error code from error.
+func GetServerErrorCode(err error) int {
+       if err == nil {
+               return 0
+       }
+       var t *errNo
+       switch {
+       case errors.As(err, &t):
+               return t.getServerErrCode()
+       default:
+               return -1
+       }
+}
+
 //nolint:unused
 func (e *errNo) getCode() int {
        return e.code
@@ -90,6 +111,10 @@ func (e *errNo) getStrCode() string {
        return e.strCode
 }
 
+func (e *errNo) getServerErrCode() int {
+       return e.serverErrCode
+}
+
 func getErrorCode(err error) string {
        if err == nil {
                return errOK.getStrCode()
@@ -492,7 +517,7 @@ func (w *worker) backoffRetry(ctx context.Context, batch 
*batchReq) {
 
                // use ExponentialBackoff
                backoff := util.ExponentialBackoff{
-                       InitialInterval: 100 * time.Millisecond,
+                       InitialInterval: w.options.RetryInitialInterval,
                        MaxInterval:     10 * time.Second,
                        Multiplier:      2.0,
                        Randomization:   0.2,
@@ -622,6 +647,25 @@ func (w *worker) handleRsp(rsp *batchRsp) {
        // call batch.done to release the resources it holds
        var err = error(nil)
        if rsp.errCode != 0 {
+               // Check if connection switch is needed
+               needSwitchConn, isRetryable := 
retryableServerErrorCodes[rsp.errCode]
+               if needSwitchConn && w.client != nil {
+                       w.log.Warn("server error detected, switching 
connection, errCode:", rsp.errCode,
+                               ", batchID:", batch.batchID)
+                       w.updateConn(nil, nil)
+               }
+
+               // Check if retry is needed
+               if w.options.RetryOnServerError && isRetryable && batch.retries 
< w.options.MaxRetries {
+                       delete(w.unackedBatches, batchID)
+
+                       w.log.Warn("server error, will retry, errCode:", 
rsp.errCode,
+                               ", batchID:", batch.batchID, ", retries:", 
batch.retries)
+
+                       w.backoffRetry(context.Background(), batch)
+                       return
+               }
+
                err = &errNo{
                        code:    10011,
                        strCode: "10011",
@@ -631,6 +675,7 @@ func (w *worker) handleRsp(rsp *batchRsp) {
                                ", groupID=" + rsp.groupID +
                                ", streamID=" + rsp.streamID +
                                ", dt=" + rsp.dt,
+                       serverErrCode: rsp.errCode,
                }
                w.log.Error("send succeed but got error code:", rsp.errCode)
        }

Reply via email to