This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0edced5706 [INLONG-12058][SDK] Support retry mechanism when "server
error" occurs in dataproxy client (#12059)
0edced5706 is described below
commit 0edced5706272a1b1d2bea627e74ec66a05938ec
Author: yfsn666 <[email protected]>
AuthorDate: Mon Jan 5 10:08:57 2026 +0800
[INLONG-12058][SDK] Support retry mechanism when "server error" occurs in
dataproxy client (#12059)
---
.../dataproxy-sdk-golang/dataproxy/options.go | 6 ++
.../dataproxy/options_producer.go | 17 +++++
.../dataproxy-sdk-golang/dataproxy/worker.go | 87 ++++++++++++++++------
3 files changed, 89 insertions(+), 21 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go
index 985802ed6d..200096d8ce 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options.go
@@ -68,6 +68,8 @@ type Options struct {
WorkerNum int // worker number,
default: 8
SendTimeout time.Duration // send timeout, default:
30000ms
MaxRetries int // max retry count,
default: 2
+ RetryOnServerError bool // whether to retry on
server error, default: false
+ RetryInitialInterval time.Duration // initial retry interval
for exponential backoff, default: 100ms
BatchingMaxPublishDelay time.Duration // the time period within
which the messages sent will be batched, default: 20ms
BatchingMaxMessages int // the maximum number of
messages permitted in a batch, default: 50
BatchingMaxSize int // the maximum number of
bytes permitted in a batch, default: 40K
@@ -159,6 +161,10 @@ func (options *Options) ValidateAndSetDefault() error {
options.MaxRetries = 2
}
+ if options.RetryInitialInterval <= 0 {
+ options.RetryInitialInterval = 100 * time.Millisecond
+ }
+
if options.WriteBufferSize <= 0 {
options.WriteBufferSize = 8 * 1024 * 1024
}
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_producer.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_producer.go
index 088a5263b7..a419d54528 100755
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_producer.go
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/options_producer.go
@@ -50,6 +50,23 @@ func WithMaxRetries(n int) Option {
}
}
+// WithRetryOnServerError sets RetryOnServerError
+func WithRetryOnServerError(retry bool) Option {
+ return func(o *Options) {
+ o.RetryOnServerError = retry
+ }
+}
+
+// WithRetryInitialInterval sets RetryInitialInterval
+func WithRetryInitialInterval(interval time.Duration) Option {
+ return func(o *Options) {
+ if interval <= 0 {
+ return
+ }
+ o.RetryInitialInterval = interval
+ }
+}
+
// WithBatchingMaxPublishDelay sets BatchingMaxPublishDelay
func WithBatchingMaxPublishDelay(t time.Duration) Option {
return func(o *Options) {
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
index 02827e522f..46c6cfde3c 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go
@@ -41,6 +41,12 @@ const (
defaultMapCleanThreshold = 500000
)
+// retryableServerErrorCodes defines which server error codes should trigger
retry
+// and whether connection switch is needed.
+var retryableServerErrorCodes = map[int]bool{
+ 2: true, // SERVICE_CLOSED
+}
+
type workerState int32
const (
@@ -52,35 +58,50 @@ const (
)
var (
- errOK = &errNo{code: 0, strCode: "0", message: "OK"}
- errSendTimeout = &errNo{code: 10001, strCode: "10001", message:
"message send timeout"}
- errSendFailed = &errNo{code: 10002, strCode: "10002", message:
"message send failed"} //nolint:unused
- errProducerClosed = &errNo{code: 10003, strCode: "10003", message:
"producer already been closed"}
- errSendQueueIsFull = &errNo{code: 10004, strCode: "10004", message:
"producer send queue is full"}
- errContextExpired = &errNo{code: 10005, strCode: "10005", message:
"message context expired"}
- errNewConnFailed = &errNo{code: 10006, strCode: "10006", message:
"new conn failed"}
- errConnWriteFailed = &errNo{code: 10007, strCode: "10007", message:
"conn write failed"}
- errConnReadFailed = &errNo{code: 10008, strCode: "10008", message:
"conn read failed"}
- errLogTooLong = &errNo{code: 10009, strCode: "10009", message:
"input log is too long"} //nolint:unused
- errBadLog = &errNo{code: 10010, strCode: "10010", message:
"input log is invalid"}
- errServerError = &errNo{code: 10011, strCode: "10011", message:
"server error"} //nolint:unused
- errServerPanic = &errNo{code: 10012, strCode: "10012", message:
"server panic"}
- workerBusy = &errNo{code: 10013, strCode: "10013", message:
"worker is busy"}
- errNoMatchReq4Rsp = &errNo{code: 10014, strCode: "10014", message:
"no match unacknowledged request for response"}
- errConnClosedByPeer = &errNo{code: 10015, strCode: "10015", message:
"conn closed by peer"}
- errUnknown = &errNo{code: 20001, strCode: "20001", message:
"unknown"}
+ errOK = &errNo{code: 0, strCode: "0", message: "OK",
serverErrCode: -1}
+ errSendTimeout = &errNo{code: 10001, strCode: "10001", message:
"message send timeout", serverErrCode: -1}
+ errSendFailed = &errNo{code: 10002, strCode: "10002", message:
"message send failed", serverErrCode: -1} //nolint:unused
+ errProducerClosed = &errNo{code: 10003, strCode: "10003", message:
"producer already been closed", serverErrCode: -1}
+ errSendQueueIsFull = &errNo{code: 10004, strCode: "10004", message:
"producer send queue is full", serverErrCode: -1}
+ errContextExpired = &errNo{code: 10005, strCode: "10005", message:
"message context expired", serverErrCode: -1}
+ errNewConnFailed = &errNo{code: 10006, strCode: "10006", message:
"new conn failed", serverErrCode: -1}
+ errConnWriteFailed = &errNo{code: 10007, strCode: "10007", message:
"conn write failed", serverErrCode: -1}
+ errConnReadFailed = &errNo{code: 10008, strCode: "10008", message:
"conn read failed", serverErrCode: -1}
+ errLogTooLong = &errNo{code: 10009, strCode: "10009", message:
"input log is too long", serverErrCode: -1} //nolint:unused
+ errBadLog = &errNo{code: 10010, strCode: "10010", message:
"input log is invalid", serverErrCode: -1}
+ errServerError = &errNo{code: 10011, strCode: "10011", message:
"server error", serverErrCode: -1} //nolint:unused
+ errServerPanic = &errNo{code: 10012, strCode: "10012", message:
"server panic", serverErrCode: -1}
+ workerBusy = &errNo{code: 10013, strCode: "10013", message:
"worker is busy", serverErrCode: -1}
+ errNoMatchReq4Rsp = &errNo{code: 10014, strCode: "10014", message:
"no match unacknowledged request for response", serverErrCode: -1}
+ errConnClosedByPeer = &errNo{code: 10015, strCode: "10015", message:
"conn closed by peer", serverErrCode: -1}
+ errUnknown = &errNo{code: 20001, strCode: "20001", message:
"unknown", serverErrCode: -1}
)
type errNo struct {
- code int
- strCode string
- message string
+ code int
+ strCode string
+ message string
+ serverErrCode int // server error code from server response, -1 means
not a server error
}
func (e *errNo) Error() string {
return e.message
}
+// GetServerErrorCode extracts server error code from error.
+func GetServerErrorCode(err error) int {
+ if err == nil {
+ return 0
+ }
+ var t *errNo
+ switch {
+ case errors.As(err, &t):
+ return t.getServerErrCode()
+ default:
+ return -1
+ }
+}
+
//nolint:unused
func (e *errNo) getCode() int {
return e.code
@@ -90,6 +111,10 @@ func (e *errNo) getStrCode() string {
return e.strCode
}
+func (e *errNo) getServerErrCode() int {
+ return e.serverErrCode
+}
+
func getErrorCode(err error) string {
if err == nil {
return errOK.getStrCode()
@@ -492,7 +517,7 @@ func (w *worker) backoffRetry(ctx context.Context, batch
*batchReq) {
// use ExponentialBackoff
backoff := util.ExponentialBackoff{
- InitialInterval: 100 * time.Millisecond,
+ InitialInterval: w.options.RetryInitialInterval,
MaxInterval: 10 * time.Second,
Multiplier: 2.0,
Randomization: 0.2,
@@ -622,6 +647,25 @@ func (w *worker) handleRsp(rsp *batchRsp) {
// call batch.done to release the resources it holds
var err = error(nil)
if rsp.errCode != 0 {
+ // Check if connection switch is needed
+ needSwitchConn, isRetryable :=
retryableServerErrorCodes[rsp.errCode]
+ if needSwitchConn && w.client != nil {
+ w.log.Warn("server error detected, switching
connection, errCode:", rsp.errCode,
+ ", batchID:", batch.batchID)
+ w.updateConn(nil, nil)
+ }
+
+ // Check if retry is needed
+ if w.options.RetryOnServerError && isRetryable && batch.retries
< w.options.MaxRetries {
+ delete(w.unackedBatches, batchID)
+
+ w.log.Warn("server error, will retry, errCode:",
rsp.errCode,
+ ", batchID:", batch.batchID, ", retries:",
batch.retries)
+
+ w.backoffRetry(context.Background(), batch)
+ return
+ }
+
err = &errNo{
code: 10011,
strCode: "10011",
@@ -631,6 +675,7 @@ func (w *worker) handleRsp(rsp *batchRsp) {
", groupID=" + rsp.groupID +
", streamID=" + rsp.streamID +
", dt=" + rsp.dt,
+ serverErrCode: rsp.errCode,
}
w.log.Error("send succeed but got error code:", rsp.errCode)
}