This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch feature-triple
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/feature-triple by this push:
new 223587cfb fixes #2448, put CallOptions into invocation attachments
(#2449)
223587cfb is described below
commit 223587cfb022ce2c52c2237524d02df630b80cc2
Author: Ken Liu <[email protected]>
AuthorDate: Wed Oct 18 14:28:08 2023 +0800
fixes #2448, put CallOptions into invocation attachments (#2449)
---
client/client.go | 3 +-
client/options.go | 2 +
cluster/cluster/failback/cluster_invoker.go | 76 +++++++++++++++++------------
protocol/dubbo/dubbo_invoker.go | 27 +++++-----
protocol/invocation/rpcinvocation.go | 10 ++++
5 files changed, 73 insertions(+), 45 deletions(-)
diff --git a/client/client.go b/client/client.go
index 8e46a024b..2ba29b904 100644
--- a/client/client.go
+++ b/client/client.go
@@ -108,7 +108,8 @@ func (cli *Client) Init(info *ClientInfo) error {
func generateInvocation(methodName string, paramsRawVals []interface{},
callType string, opts *CallOptions) (protocol.Invocation, error) {
inv := invocation_impl.NewRPCInvocationWithOptions(
invocation_impl.WithMethodName(methodName),
- // todo: process opts
+ invocation_impl.WithAttachment(constant.TimeoutKey,
opts.RequestTimeout),
+ invocation_impl.WithAttachment(constant.RetriesKey,
opts.Retries),
invocation_impl.WithParameterRawValues(paramsRawVals),
)
inv.SetAttribute(constant.CallTypeKey, callType)
diff --git a/client/options.go b/client/options.go
index 53a96fb24..2b88f5790 100644
--- a/client/options.go
+++ b/client/options.go
@@ -414,12 +414,14 @@ func newDefaultCallOptions() *CallOptions {
}
}
+// WithCallRequestTimeout the maximum waiting time for one specific call, only
works for 'tri' and 'dubbo' protocol
func WithCallRequestTimeout(timeout string) CallOption {
return func(opts *CallOptions) {
opts.RequestTimeout = timeout
}
}
+// WithCallRetries the maximum retry times on request failure for one specific
call, only works for 'tri' and 'dubbo' protocol
func WithCallRetries(retries string) CallOption {
return func(opts *CallOptions) {
opts.Retries = retries
diff --git a/cluster/cluster/failback/cluster_invoker.go
b/cluster/cluster/failback/cluster_invoker.go
index f647e33a6..cf27f30dd 100644
--- a/cluster/cluster/failback/cluster_invoker.go
+++ b/cluster/cluster/failback/cluster_invoker.go
@@ -83,7 +83,8 @@ func (invoker *failbackClusterInvoker) tryTimerTaskProc(ctx
context.Context, ret
result := retryInvoker.Invoke(ctx, retryTask.invocation)
if result.Error() != nil {
retryTask.lastInvoker = retryInvoker
- invoker.checkRetry(retryTask, result.Error())
+ retryTask.lastErr = result.Error()
+ retryTask.checkRetry()
}
}
@@ -115,22 +116,6 @@ func (invoker *failbackClusterInvoker) process(ctx
context.Context) {
}
}
-func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask,
err error) {
- logger.Errorf("Failed retry to invoke the method %v in the service %v,
wait again. The exception: %v.\n",
- retryTask.invocation.MethodName(), invoker.GetURL().Service(),
err.Error())
- retryTask.retries++
- retryTask.lastT = time.Now()
- if retryTask.retries > invoker.maxRetries {
- logger.Errorf("Failed retry times exceed threshold (%v), We
have to abandon, invocation-> %v.\n",
- retryTask.retries, retryTask.invocation)
- return
- }
-
- if err := invoker.taskList.Put(retryTask); err != nil {
- logger.Errorf("invoker.taskList.Put(retryTask:%#v) = error:%v",
retryTask, err)
- }
-}
-
// nolint
func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation
protocol.Invocation) protocol.Result {
invokers := invoker.Directory.List(invocation)
@@ -166,7 +151,7 @@ func (invoker *failbackClusterInvoker) Invoke(ctx
context.Context, invocation pr
return &protocol.RPCResult{}
}
- timerTask := newRetryTimerTask(loadBalance, invocation,
invokers, ivk)
+ timerTask := newRetryTimerTask(loadBalance, invocation,
invokers, ivk, invoker)
invoker.taskList.Put(timerTask)
logger.Errorf("Failback to invoke the method %v in the service
%v, wait for retry in background. Ignored exception: %v.\n",
@@ -189,21 +174,50 @@ func (invoker *failbackClusterInvoker) Destroy() {
}
type retryTimerTask struct {
- loadbalance loadbalance.LoadBalance
- invocation protocol.Invocation
- invokers []protocol.Invoker
- lastInvoker protocol.Invoker
- retries int64
- lastT time.Time
+ loadbalance loadbalance.LoadBalance
+ invocation protocol.Invocation
+ invokers []protocol.Invoker
+ lastInvoker protocol.Invoker
+ retries int64
+ maxRetries int64
+ lastT time.Time
+ clusterInvoker *failbackClusterInvoker
+ lastErr error
+}
+
+func (t *retryTimerTask) checkRetry() {
+ logger.Errorf("Failed retry to invoke the method %v in the service %v,
wait again. The exception: %v.\n",
+ t.invocation.MethodName(), t.clusterInvoker.GetURL().Service(),
t.lastErr)
+ t.retries++
+ t.lastT = time.Now()
+ if t.retries > t.maxRetries {
+ logger.Errorf("Retry times exceed threshold (%v), invocation->
%v.\n",
+ t.retries, t.invocation)
+ return
+ }
+
+ if err := t.clusterInvoker.taskList.Put(t); err != nil {
+ logger.Errorf("invoker.taskList.Put(retryTask:%#v) = error:%v",
t, err)
+ }
}
func newRetryTimerTask(loadbalance loadbalance.LoadBalance, invocation
protocol.Invocation, invokers []protocol.Invoker,
- lastInvoker protocol.Invoker) *retryTimerTask {
- return &retryTimerTask{
- loadbalance: loadbalance,
- invocation: invocation,
- invokers: invokers,
- lastInvoker: lastInvoker,
- lastT: time.Now(),
+ lastInvoker protocol.Invoker, cInvoker *failbackClusterInvoker)
*retryTimerTask {
+ task := &retryTimerTask{
+ loadbalance: loadbalance,
+ invocation: invocation,
+ invokers: invokers,
+ lastInvoker: lastInvoker,
+ lastT: time.Now(),
+ clusterInvoker: cInvoker,
+ }
+
+ if retries, ok := invocation.GetAttachment(constant.RetriesKey); ok {
+ rInt, _ := strconv.Atoi(retries)
+ task.maxRetries = int64(rInt)
+ } else {
+ task.maxRetries = cInvoker.maxRetries
}
+
+ return task
}
diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go
index 6f7275366..4a633a17c 100644
--- a/protocol/dubbo/dubbo_invoker.go
+++ b/protocol/dubbo/dubbo_invoker.go
@@ -163,21 +163,22 @@ func (di *DubboInvoker) Invoke(ctx context.Context, ivc
protocol.Invocation) pro
// get timeout including methodConfig
func (di *DubboInvoker) getTimeout(ivc *invocation.RPCInvocation)
time.Duration {
- methodName := ivc.MethodName()
- if di.GetURL().GetParamBool(constant.GenericKey, false) {
- methodName = ivc.Arguments()[0].(string)
- }
- timeout :=
di.GetURL().GetParam(strings.Join([]string{constant.MethodKeys, methodName,
constant.TimeoutKey}, "."), "")
- if len(timeout) != 0 {
- if t, err := time.ParseDuration(timeout); err == nil {
- // config timeout into attachment
- ivc.SetAttachment(constant.TimeoutKey,
strconv.Itoa(int(t.Milliseconds())))
- return t
+ timeout := di.timeout
//default timeout
+ if attachTimeout, ok := ivc.GetAttachment(constant.TimeoutKey); ok {
//check invocation timeout
+ timeout, _ = time.ParseDuration(attachTimeout)
+ } else { // check method timeout
+ methodName := ivc.MethodName()
+ if di.GetURL().GetParamBool(constant.GenericKey, false) {
+ methodName = ivc.Arguments()[0].(string)
+ }
+ mTimeout :=
di.GetURL().GetParam(strings.Join([]string{constant.MethodKeys, methodName,
constant.TimeoutKey}, "."), "")
+ if len(mTimeout) != 0 {
+ timeout, _ = time.ParseDuration(mTimeout)
}
+ // set timeout into invocation
+ ivc.SetAttachment(constant.TimeoutKey,
strconv.Itoa(int(timeout.Milliseconds())))
}
- // set timeout into invocation at method level
- ivc.SetAttachment(constant.TimeoutKey,
strconv.Itoa(int(di.timeout.Milliseconds())))
- return di.timeout
+ return timeout
}
func (di *DubboInvoker) IsAvailable() bool {
diff --git a/protocol/invocation/rpcinvocation.go
b/protocol/invocation/rpcinvocation.go
index 10e1fa582..84393c8d9 100644
--- a/protocol/invocation/rpcinvocation.go
+++ b/protocol/invocation/rpcinvocation.go
@@ -335,6 +335,16 @@ func WithAttachments(attachments map[string]interface{})
option {
}
}
+// WithAttachment put a key-value pair into attachments.
+func WithAttachment(k string, v interface{}) option {
+ return func(invo *RPCInvocation) {
+ if invo.attachments == nil {
+ invo.attachments = make(map[string]interface{})
+ }
+ invo.attachments[k] = v
+ }
+}
+
// WithInvoker creates option with @invoker.
func WithInvoker(invoker protocol.Invoker) option {
return func(invo *RPCInvocation) {