This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 75212068c fix(go-client): add new error codes to distinguish between
timeout and session reset (#2286)
75212068c is described below
commit 75212068c3909213edfeb213e6da3497b277f25f
Author: nanorth <[email protected]>
AuthorDate: Mon Nov 10 17:51:17 2025 +0800
fix(go-client): add new error codes to distinguish between timeout and
session reset (#2286)
### What problem does this PR solve? <!--add issue link with summary if
exists-->
The existing Go client code does not distinguish between request timeout
and session reset, as both return `context.DeadlineExceeded` and trigger a
config update.
This PR introduces a new client-defined error, `ERR_SESSION_RESET`, to
differentiate between these two cases, and ensures that config updates only
occur when `ERR_SESSION_RESET` is returned.
### Checklist <!--REMOVE the items that are not applicable-->
##### Tests <!-- At least one of them must be included. -->
- Unit test included
---
go-client/idl/base/dsn_err_string.go | 8 +++++---
go-client/idl/base/error_code.go | 2 ++
go-client/pegasus/table_connector.go | 4 +++-
go-client/pegasus/table_connector_test.go | 4 ++--
go-client/session/session.go | 2 +-
go-client/session/session_test.go | 32 +++++++++++++++++++++++++++++++
6 files changed, 45 insertions(+), 7 deletions(-)
diff --git a/go-client/idl/base/dsn_err_string.go
b/go-client/idl/base/dsn_err_string.go
index 4a41dd95d..915adc161 100644
--- a/go-client/idl/base/dsn_err_string.go
+++ b/go-client/idl/base/dsn_err_string.go
@@ -1,14 +1,15 @@
// Code generated by "enumer -type=DsnErrCode -output=dsn_err_string.go"; DO
NOT EDIT.
+//
package base
import (
"fmt"
)
-const _DsnErrCodeName =
"ERR_OKERR_UNKNOWNERR_REPLICATION_FAILUREERR_APP_EXISTERR_APP_NOT_EXISTERR_APP_DROPPEDERR_BUSY_CREATINGERR_BUSY_DROPPINGERR_EXPIREDERR_LOCK_ALREADY_EXISTERR_HOLD_BY_OTHERSERR_RECURSIVE_LOCKERR_NO_OWNERERR_NODE_ALREADY_EXISTERR_INCONSISTENT_STATEERR_ARRAY_INDEX_OUT_OF_RANGEERR_SERVICE_NOT_FOUNDERR_SERVICE_ALREADY_RUNNINGERR_IO_PENDINGERR_TIMEOUTERR_SERVICE_NOT_ACTIVEERR_BUSYERR_NETWORK_INIT_FAILEDERR_FORWARD_TO_OTHERSERR_OBJECT_NOT_FOUNDERR_HANDLER_NOT_FOUNDERR_LEA
[...]
+const _DsnErrCodeName =
"ERR_OKERR_UNKNOWNERR_REPLICATION_FAILUREERR_APP_EXISTERR_APP_NOT_EXISTERR_APP_DROPPEDERR_BUSY_CREATINGERR_BUSY_DROPPINGERR_EXPIREDERR_LOCK_ALREADY_EXISTERR_HOLD_BY_OTHERSERR_RECURSIVE_LOCKERR_NO_OWNERERR_NODE_ALREADY_EXISTERR_INCONSISTENT_STATEERR_ARRAY_INDEX_OUT_OF_RANGEERR_SERVICE_NOT_FOUNDERR_SERVICE_ALREADY_RUNNINGERR_IO_PENDINGERR_TIMEOUTERR_SERVICE_NOT_ACTIVEERR_BUSYERR_NETWORK_INIT_FAILEDERR_FORWARD_TO_OTHERSERR_OBJECT_NOT_FOUNDERR_HANDLER_NOT_FOUNDERR_LEA
[...]
-var _DsnErrCodeIndex = [...]uint16{0, 6, 17, 40, 53, 70, 85, 102, 119, 130,
152, 170, 188, 200, 222, 244, 272, 293, 320, 334, 345, 367, 375, 398, 419, 439,
460, 481, 507, 526, 548, 569, 586, 604, 625, 650, 664, 682, 698, 716, 735, 755,
773, 795, 819, 836, 857, 877, 901, 920, 941, 957, 976, 990, 1003, 1024, 1049,
1074, 1086, 1105, 1123, 1145, 1168, 1188, 1208, 1227, 1244, 1261, 1280, 1292,
1305, 1333, 1352, 1373}
+var _DsnErrCodeIndex = [...]uint16{0, 6, 17, 40, 53, 70, 85, 102, 119, 130,
152, 170, 188, 200, 222, 244, 272, 293, 320, 334, 345, 367, 375, 398, 419, 439,
460, 481, 507, 526, 548, 569, 586, 604, 625, 650, 664, 682, 698, 716, 735, 755,
773, 795, 819, 836, 857, 877, 901, 920, 941, 957, 976, 990, 1003, 1024, 1049,
1074, 1086, 1105, 1123, 1145, 1168, 1188, 1208, 1227, 1244, 1261, 1280, 1292,
1305, 1333, 1352, 1373, 1390}
func (i DsnErrCode) String() string {
if i < 0 || i >= DsnErrCode(len(_DsnErrCodeIndex)-1) {
@@ -17,7 +18,7 @@ func (i DsnErrCode) String() string {
return _DsnErrCodeName[_DsnErrCodeIndex[i]:_DsnErrCodeIndex[i+1]]
}
-var _DsnErrCodeValues = []DsnErrCode{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32,
33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52,
53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72}
+var _DsnErrCodeValues = []DsnErrCode{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32,
33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52,
53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72,
73}
var _DsnErrCodeNameToValueMap = map[string]DsnErrCode{
_DsnErrCodeName[0:6]: 0,
@@ -93,6 +94,7 @@ var _DsnErrCodeNameToValueMap = map[string]DsnErrCode{
_DsnErrCodeName[1305:1333]: 70,
_DsnErrCodeName[1333:1352]: 71,
_DsnErrCodeName[1352:1373]: 72,
+ _DsnErrCodeName[1373:1390]: 73,
}
// DsnErrCodeString retrieves an enum value from the enum constants string
name.
diff --git a/go-client/idl/base/error_code.go b/go-client/idl/base/error_code.go
index c105cb48b..7ff488e78 100644
--- a/go-client/idl/base/error_code.go
+++ b/go-client/idl/base/error_code.go
@@ -115,6 +115,8 @@ const (
ERR_PARENT_PARTITION_MISUSED
ERR_CHILD_NOT_READY
ERR_DISK_INSUFFICIENT
+ // ERROR_CODE defined by client
+ ERR_SESSION_RESET
)
func (e DsnErrCode) Error() string {
diff --git a/go-client/pegasus/table_connector.go
b/go-client/pegasus/table_connector.go
index f9d4ad51a..800d4fc67 100644
--- a/go-client/pegasus/table_connector.go
+++ b/go-client/pegasus/table_connector.go
@@ -836,8 +836,10 @@ func (p *pegasusTableConnector) handleReplicaError(err
error, replica *session.R
return false, false, nil
case base.ERR_TIMEOUT:
- case context.DeadlineExceeded:
+ case base.ERR_SESSION_RESET:
+ // connection with the server failed
confUpdate = true
+ case context.DeadlineExceeded:
case context.Canceled:
// timeout will not trigger a configuration update
diff --git a/go-client/pegasus/table_connector_test.go
b/go-client/pegasus/table_connector_test.go
index 75823c18c..73c5ef0c5 100644
--- a/go-client/pegasus/table_connector_test.go
+++ b/go-client/pegasus/table_connector_test.go
@@ -277,14 +277,14 @@ func TestPegasusTableConnector_TriggerSelfUpdate(t
*testing.T) {
assert.True(t, confUpdate)
assert.False(t, retry)
- confUpdate, retry, err =
ptb.handleReplicaError(context.DeadlineExceeded, nil)
+ confUpdate, retry, err = ptb.handleReplicaError(base.ERR_SESSION_RESET,
nil)
<-ptb.confUpdateCh
assert.Error(t, err)
assert.True(t, confUpdate)
assert.False(t, retry)
{ // Ensure: The following errors should not trigger configuration
update
- errorTypes := []error{base.ERR_TIMEOUT,
base.ERR_CAPACITY_EXCEEDED, base.ERR_NOT_ENOUGH_MEMBER, base.ERR_BUSY,
base.ERR_SPLITTING, base.ERR_DISK_INSUFFICIENT}
+ errorTypes := []error{base.ERR_TIMEOUT,
base.ERR_CAPACITY_EXCEEDED, base.ERR_NOT_ENOUGH_MEMBER, base.ERR_BUSY,
base.ERR_SPLITTING, base.ERR_DISK_INSUFFICIENT, context.DeadlineExceeded}
for _, err := range errorTypes {
channelEmpty := false
diff --git a/go-client/session/session.go b/go-client/session/session.go
index 0f226171f..84d465a4f 100644
--- a/go-client/session/session.go
+++ b/go-client/session/session.go
@@ -352,7 +352,7 @@ func (n *nodeSession) waitUntilSessionReady(ctx
context.Context) error {
if !ready {
//return error directly so that it can be recognized in
`handleReplicaError`
n.logger.Printf("session %s is unable to connect (used
%dms), the context error: %s", n, time.Since(dialStart)/time.Millisecond,
ctx.Err())
- return ctx.Err()
+ return base.ERR_SESSION_RESET
}
}
return nil
diff --git a/go-client/session/session_test.go
b/go-client/session/session_test.go
index 1860b186b..8efa06bfb 100644
--- a/go-client/session/session_test.go
+++ b/go-client/session/session_test.go
@@ -400,3 +400,35 @@ func TestNodeSession_ReadEOF(t *testing.T) {
time.Sleep(100 * time.Millisecond)
assert.Equal(t, n.ConnState(), rpc.ConnStateTransientFailure)
}
+
+func TestNodeSession_SessionResetDuringWait(t *testing.T) {
+ defer leaktest.Check(t)()
+
+ reader := bytes.NewBuffer(make([]byte, 0))
+ writer := &bytes.Buffer{}
+ n := newFakeNodeSession(reader, writer)
+ defer n.Close()
+
+ n.tom.Go(n.loopForRequest)
+ time.Sleep(100 * time.Millisecond)
+ assert.Equal(t, rpc.ConnStateReady, n.conn.GetState())
+
+ arg := rrdb.NewMetaQueryCfgArgs()
+ arg.Query = replication.NewQueryCfgRequest()
+
+ // test the scenario where the server is busy or unresponsive and
doesn't reply,
+ // causing the client to time out. This results in
context.DeadlineExceeded.
+ ctx, _ := context.WithTimeout(context.Background(), time.Second*1)
+ _, err := n.CallWithGpid(ctx, &base.Gpid{}, 0, arg, "RPC_NAME")
+ assert.Equal(t, context.DeadlineExceeded, err)
+
+ // Now simulate a session reset on the client side.
+ n.conn.Close()
+
+ // Wait for the session to become ready again — this should fail due to
the session reset.
+ // This part verifies that the session correctly reports
ERR_SESSION_RESET instead of
+ // a timeout or other error types, ensuring proper error handling and
distinction.
+ ctx, _ = context.WithTimeout(context.Background(), time.Second*1)
+ err = n.waitUntilSessionReady(ctx)
+ assert.Equal(t, base.ERR_SESSION_RESET, err)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]