This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch feature-triple
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/feature-triple by this push:
     new 223587cfb fixes #2448, put CallOptions into invocation attachments 
(#2449)
223587cfb is described below

commit 223587cfb022ce2c52c2237524d02df630b80cc2
Author: Ken Liu <[email protected]>
AuthorDate: Wed Oct 18 14:28:08 2023 +0800

    fixes #2448, put CallOptions into invocation attachments (#2449)
---
 client/client.go                            |  3 +-
 client/options.go                           |  2 +
 cluster/cluster/failback/cluster_invoker.go | 76 +++++++++++++++++------------
 protocol/dubbo/dubbo_invoker.go             | 27 +++++-----
 protocol/invocation/rpcinvocation.go        | 10 ++++
 5 files changed, 73 insertions(+), 45 deletions(-)

diff --git a/client/client.go b/client/client.go
index 8e46a024b..2ba29b904 100644
--- a/client/client.go
+++ b/client/client.go
@@ -108,7 +108,8 @@ func (cli *Client) Init(info *ClientInfo) error {
 func generateInvocation(methodName string, paramsRawVals []interface{}, 
callType string, opts *CallOptions) (protocol.Invocation, error) {
        inv := invocation_impl.NewRPCInvocationWithOptions(
                invocation_impl.WithMethodName(methodName),
-               // todo: process opts
+               invocation_impl.WithAttachment(constant.TimeoutKey, 
opts.RequestTimeout),
+               invocation_impl.WithAttachment(constant.RetriesKey, 
opts.Retries),
                invocation_impl.WithParameterRawValues(paramsRawVals),
        )
        inv.SetAttribute(constant.CallTypeKey, callType)
diff --git a/client/options.go b/client/options.go
index 53a96fb24..2b88f5790 100644
--- a/client/options.go
+++ b/client/options.go
@@ -414,12 +414,14 @@ func newDefaultCallOptions() *CallOptions {
        }
 }
 
+// WithCallRequestTimeout the maximum waiting time for one specific call, only 
works for 'tri' and 'dubbo' protocol
 func WithCallRequestTimeout(timeout string) CallOption {
        return func(opts *CallOptions) {
                opts.RequestTimeout = timeout
        }
 }
 
+// WithCallRetries the maximum retry times on request failure for one specific 
call, only works for 'tri' and 'dubbo' protocol
 func WithCallRetries(retries string) CallOption {
        return func(opts *CallOptions) {
                opts.Retries = retries
diff --git a/cluster/cluster/failback/cluster_invoker.go 
b/cluster/cluster/failback/cluster_invoker.go
index f647e33a6..cf27f30dd 100644
--- a/cluster/cluster/failback/cluster_invoker.go
+++ b/cluster/cluster/failback/cluster_invoker.go
@@ -83,7 +83,8 @@ func (invoker *failbackClusterInvoker) tryTimerTaskProc(ctx 
context.Context, ret
        result := retryInvoker.Invoke(ctx, retryTask.invocation)
        if result.Error() != nil {
                retryTask.lastInvoker = retryInvoker
-               invoker.checkRetry(retryTask, result.Error())
+               retryTask.lastErr = result.Error()
+               retryTask.checkRetry()
        }
 }
 
@@ -115,22 +116,6 @@ func (invoker *failbackClusterInvoker) process(ctx 
context.Context) {
        }
 }
 
-func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, 
err error) {
-       logger.Errorf("Failed retry to invoke the method %v in the service %v, 
wait again. The exception: %v.\n",
-               retryTask.invocation.MethodName(), invoker.GetURL().Service(), 
err.Error())
-       retryTask.retries++
-       retryTask.lastT = time.Now()
-       if retryTask.retries > invoker.maxRetries {
-               logger.Errorf("Failed retry times exceed threshold (%v), We 
have to abandon, invocation-> %v.\n",
-                       retryTask.retries, retryTask.invocation)
-               return
-       }
-
-       if err := invoker.taskList.Put(retryTask); err != nil {
-               logger.Errorf("invoker.taskList.Put(retryTask:%#v) = error:%v", 
retryTask, err)
-       }
-}
-
 // nolint
 func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation 
protocol.Invocation) protocol.Result {
        invokers := invoker.Directory.List(invocation)
@@ -166,7 +151,7 @@ func (invoker *failbackClusterInvoker) Invoke(ctx 
context.Context, invocation pr
                        return &protocol.RPCResult{}
                }
 
-               timerTask := newRetryTimerTask(loadBalance, invocation, 
invokers, ivk)
+               timerTask := newRetryTimerTask(loadBalance, invocation, 
invokers, ivk, invoker)
                invoker.taskList.Put(timerTask)
 
                logger.Errorf("Failback to invoke the method %v in the service 
%v, wait for retry in background. Ignored exception: %v.\n",
@@ -189,21 +174,50 @@ func (invoker *failbackClusterInvoker) Destroy() {
 }
 
 type retryTimerTask struct {
-       loadbalance loadbalance.LoadBalance
-       invocation  protocol.Invocation
-       invokers    []protocol.Invoker
-       lastInvoker protocol.Invoker
-       retries     int64
-       lastT       time.Time
+       loadbalance    loadbalance.LoadBalance
+       invocation     protocol.Invocation
+       invokers       []protocol.Invoker
+       lastInvoker    protocol.Invoker
+       retries        int64
+       maxRetries     int64
+       lastT          time.Time
+       clusterInvoker *failbackClusterInvoker
+       lastErr        error
+}
+
+func (t *retryTimerTask) checkRetry() {
+       logger.Errorf("Failed retry to invoke the method %v in the service %v, 
wait again. The exception: %v.\n",
+               t.invocation.MethodName(), t.clusterInvoker.GetURL().Service(), 
t.lastErr)
+       t.retries++
+       t.lastT = time.Now()
+       if t.retries > t.maxRetries {
+               logger.Errorf("Retry times exceed threshold (%v), invocation-> 
%v.\n",
+                       t.retries, t.invocation)
+               return
+       }
+
+       if err := t.clusterInvoker.taskList.Put(t); err != nil {
+               logger.Errorf("invoker.taskList.Put(retryTask:%#v) = error:%v", 
t, err)
+       }
 }
 
 func newRetryTimerTask(loadbalance loadbalance.LoadBalance, invocation 
protocol.Invocation, invokers []protocol.Invoker,
-       lastInvoker protocol.Invoker) *retryTimerTask {
-       return &retryTimerTask{
-               loadbalance: loadbalance,
-               invocation:  invocation,
-               invokers:    invokers,
-               lastInvoker: lastInvoker,
-               lastT:       time.Now(),
+       lastInvoker protocol.Invoker, cInvoker *failbackClusterInvoker) 
*retryTimerTask {
+       task := &retryTimerTask{
+               loadbalance:    loadbalance,
+               invocation:     invocation,
+               invokers:       invokers,
+               lastInvoker:    lastInvoker,
+               lastT:          time.Now(),
+               clusterInvoker: cInvoker,
+       }
+
+       if retries, ok := invocation.GetAttachment(constant.RetriesKey); ok {
+               rInt, _ := strconv.Atoi(retries)
+               task.maxRetries = int64(rInt)
+       } else {
+               task.maxRetries = cInvoker.maxRetries
        }
+
+       return task
 }
diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go
index 6f7275366..4a633a17c 100644
--- a/protocol/dubbo/dubbo_invoker.go
+++ b/protocol/dubbo/dubbo_invoker.go
@@ -163,21 +163,22 @@ func (di *DubboInvoker) Invoke(ctx context.Context, ivc 
protocol.Invocation) pro
 
 // get timeout including methodConfig
 func (di *DubboInvoker) getTimeout(ivc *invocation.RPCInvocation) 
time.Duration {
-       methodName := ivc.MethodName()
-       if di.GetURL().GetParamBool(constant.GenericKey, false) {
-               methodName = ivc.Arguments()[0].(string)
-       }
-       timeout := 
di.GetURL().GetParam(strings.Join([]string{constant.MethodKeys, methodName, 
constant.TimeoutKey}, "."), "")
-       if len(timeout) != 0 {
-               if t, err := time.ParseDuration(timeout); err == nil {
-                       // config timeout into attachment
-                       ivc.SetAttachment(constant.TimeoutKey, 
strconv.Itoa(int(t.Milliseconds())))
-                       return t
+       timeout := di.timeout                                                
//default timeout
+       if attachTimeout, ok := ivc.GetAttachment(constant.TimeoutKey); ok { 
//check invocation timeout
+               timeout, _ = time.ParseDuration(attachTimeout)
+       } else { // check method timeout
+               methodName := ivc.MethodName()
+               if di.GetURL().GetParamBool(constant.GenericKey, false) {
+                       methodName = ivc.Arguments()[0].(string)
+               }
+               mTimeout := 
di.GetURL().GetParam(strings.Join([]string{constant.MethodKeys, methodName, 
constant.TimeoutKey}, "."), "")
+               if len(mTimeout) != 0 {
+                       timeout, _ = time.ParseDuration(mTimeout)
                }
+               // set timeout into invocation
+               ivc.SetAttachment(constant.TimeoutKey, 
strconv.Itoa(int(timeout.Milliseconds())))
        }
-       // set timeout into invocation at method level
-       ivc.SetAttachment(constant.TimeoutKey, 
strconv.Itoa(int(di.timeout.Milliseconds())))
-       return di.timeout
+       return timeout
 }
 
 func (di *DubboInvoker) IsAvailable() bool {
diff --git a/protocol/invocation/rpcinvocation.go 
b/protocol/invocation/rpcinvocation.go
index 10e1fa582..84393c8d9 100644
--- a/protocol/invocation/rpcinvocation.go
+++ b/protocol/invocation/rpcinvocation.go
@@ -335,6 +335,16 @@ func WithAttachments(attachments map[string]interface{}) 
option {
        }
 }
 
+// WithAttachment put a key-value pair into attachments.
+func WithAttachment(k string, v interface{}) option {
+       return func(invo *RPCInvocation) {
+               if invo.attachments == nil {
+                       invo.attachments = make(map[string]interface{})
+               }
+               invo.attachments[k] = v
+       }
+}
+
 // WithInvoker creates option with @invoker.
 func WithInvoker(invoker protocol.Invoker) option {
        return func(invo *RPCInvocation) {

Reply via email to