This is an automated email from the ASF dual-hosted git repository.
yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new b4937faa6 fix(go-client): loopForRequest not return and retry forever
(#1444)
b4937faa6 is described below
commit b4937faa6f07248cba6dd887b4e036b23c8357e2
Author: Yandi Lee <[email protected]>
AuthorDate: Tue Apr 18 15:16:17 2023 +0800
fix(go-client): loopForRequest not return and retry forever (#1444)
---
go-client/pegasus/table_connector.go | 1 +
go-client/pegasus/table_connector_test.go | 2 +-
go-client/session/session.go | 12 +++++++++++-
3 files changed, 13 insertions(+), 2 deletions(-)
diff --git a/go-client/pegasus/table_connector.go
b/go-client/pegasus/table_connector.go
index 346bfebec..d1074a7bf 100644
--- a/go-client/pegasus/table_connector.go
+++ b/go-client/pegasus/table_connector.go
@@ -726,6 +726,7 @@ func (p *pegasusTableConnector) handleReplicaError(err
error, replica *session.R
err = errors.New(err.Error() + " Rate of requests
exceeds the throughput limit")
case base.ERR_INVALID_STATE:
err = errors.New(err.Error() + " The target replica is
not primary")
+ retry = false
case base.ERR_OBJECT_NOT_FOUND:
err = errors.New(err.Error() + " The replica server
doesn't serve this partition")
case base.ERR_SPLITTING:
diff --git a/go-client/pegasus/table_connector_test.go
b/go-client/pegasus/table_connector_test.go
index 4aabe5eb4..1b2874765 100644
--- a/go-client/pegasus/table_connector_test.go
+++ b/go-client/pegasus/table_connector_test.go
@@ -261,7 +261,7 @@ func TestPegasusTableConnector_TriggerSelfUpdate(t
*testing.T) {
<-ptb.confUpdateCh
assert.Error(t, err)
assert.True(t, confUpdate)
- assert.True(t, retry)
+ assert.False(t, retry)
confUpdate, retry, err =
ptb.handleReplicaError(base.ERR_PARENT_PARTITION_MISUSED, nil)
<-ptb.confUpdateCh
diff --git a/go-client/session/session.go b/go-client/session/session.go
index 034d91c92..20aa585ae 100644
--- a/go-client/session/session.go
+++ b/go-client/session/session.go
@@ -226,8 +226,16 @@ func (n *nodeSession) notifyCallerAndDrop(req
*requestListener) {
// single-routine worker used for sending requests.
// Any error occurred will end up this goroutine as well as the connection.
func (n *nodeSession) loopForRequest() error { // no error returned actually
+ //add ticker to trigger loop return
+ // since if correlative loopForResponse returned because of
IsNetworkClosed(EOF),
+ // this loop will not receive any signal and runs forever
+ ticker := time.NewTicker(1 * time.Millisecond)
for {
select {
+ case <-ticker.C:
+ if n.conn.GetState() != rpc.ConnStateReady {
+ return nil
+ }
case <-n.tom.Dying():
return nil
case req := <-n.reqc:
@@ -335,7 +343,9 @@ func (n *nodeSession) waitUntilSessionReady(ctx
context.Context) error {
}
if !ready {
- return fmt.Errorf("session %s is unable to connect
(used %dms), the context error: %s", n, time.Since(dialStart)/time.Millisecond,
ctx.Err())
+ //return error directly so that it can be recognized in
`handleReplicaError`
+ n.logger.Printf("session %s is unable to connect (used
%dms), the context error: %s", n, time.Since(dialStart)/time.Millisecond,
ctx.Err())
+ return ctx.Err()
}
}
return nil
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]