This is an automated email from the ASF dual-hosted git repository.

justxuewei pushed a commit to branch feat-adasvc
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/feat-adasvc by this push:
     new 0acc52e  fix(adasvc): pass attachements with string
0acc52e is described below

commit 0acc52eebe95beee1abb4aa21691c1ad03034d63
Author: XavierNiu <[email protected]>
AuthorDate: Wed Dec 1 20:56:44 2021 +0800

    fix(adasvc): pass attachements with string
---
 cluster/cluster/adaptivesvc/cluster_invoker.go | 13 ++++++++++---
 cluster/metrics/local_metrics.go               |  8 ++++++--
 filter/adaptivesvc/filter.go                   |  4 ++--
 3 files changed, 18 insertions(+), 7 deletions(-)

diff --git a/cluster/cluster/adaptivesvc/cluster_invoker.go 
b/cluster/cluster/adaptivesvc/cluster_invoker.go
index fb6c7cd..0a10ae5 100644
--- a/cluster/cluster/adaptivesvc/cluster_invoker.go
+++ b/cluster/cluster/adaptivesvc/cluster_invoker.go
@@ -27,6 +27,7 @@ import (
        "dubbo.apache.org/dubbo-go/v3/common/logger"
        "dubbo.apache.org/dubbo-go/v3/protocol"
        perrors "github.com/pkg/errors"
+       "strconv"
 )
 
 type adaptiveServiceClusterInvoker struct {
@@ -61,12 +62,18 @@ func (ivk *adaptiveServiceClusterInvoker) Invoke(ctx 
context.Context, invocation
        result := invoker.Invoke(ctx, invocation)
 
        // update metrics
-       remaining := 
invocation.Attachments()[constant.AdaptiveServiceRemainingKey]
+       remainingStr := 
invocation.AttachmentsByKey(constant.AdaptiveServiceRemainingKey, "")
+       remaining, err := strconv.Atoi(remainingStr)
+       if err != nil {
+               logger.Warnf("the remaining is unexpected, we need a int type, 
but we got %d, err: %v.", remainingStr, err)
+               return result
+       }
        logger.Debugf("[adasvc cluster] The server status was received 
successfully, %s: %#v",
-               constant.AdaptiveServiceRemainingKey, remaining)
-       err := metrics.LocalMetrics.SetMethodMetrics(invoker.GetURL(),
+               constant.AdaptiveServiceRemainingKey, remainingStr)
+       err = metrics.LocalMetrics.SetMethodMetrics(invoker.GetURL(),
                invocation.MethodName(), metrics.HillClimbing, remaining)
        if err != nil {
+               logger.Warnf("adaptive service metrics update is failed, err: 
%v", err)
                return &protocol.RPCResult{Err: err}
        }
 
diff --git a/cluster/metrics/local_metrics.go b/cluster/metrics/local_metrics.go
index 9cd3a35..b3b5eac 100644
--- a/cluster/metrics/local_metrics.go
+++ b/cluster/metrics/local_metrics.go
@@ -31,18 +31,20 @@ func init() {
 
 type localMetrics struct {
        // protect metrics
-       lock    *sync.Mutex
+       lock    *sync.RWMutex
        metrics map[string]interface{}
 }
 
 func newLocalMetrics() *localMetrics {
        return &localMetrics{
-               lock:    &sync.Mutex{},
+               lock:    new(sync.RWMutex),
                metrics: make(map[string]interface{}),
        }
 }
 
 func (m *localMetrics) GetMethodMetrics(url *common.URL, methodName, key 
string) (interface{}, error) {
+       m.lock.RLock()
+       defer m.lock.RUnlock()
        metricsKey := fmt.Sprintf("%s.%s.%s.%s", getInstanceKey(url), 
getInvokerKey(url), methodName, key)
        if metrics, ok := m.metrics[metricsKey]; ok {
                return metrics, nil
@@ -51,6 +53,8 @@ func (m *localMetrics) GetMethodMetrics(url *common.URL, 
methodName, key string)
 }
 
 func (m *localMetrics) SetMethodMetrics(url *common.URL, methodName, key 
string, value interface{}) error {
+       m.lock.Lock()
+       defer m.lock.Unlock()
        metricsKey := fmt.Sprintf("%s.%s.%s.%s", getInstanceKey(url), 
getInvokerKey(url), methodName, key)
        m.metrics[metricsKey] = value
        return nil
diff --git a/filter/adaptivesvc/filter.go b/filter/adaptivesvc/filter.go
index 61d543d..0ac0b0b 100644
--- a/filter/adaptivesvc/filter.go
+++ b/filter/adaptivesvc/filter.go
@@ -107,8 +107,8 @@ func (f *adaptiveServiceProviderFilter) OnResponse(_ 
context.Context, result pro
        }
 
        // set attachments to inform consumer of provider status
-       invocation.SetAttachments(constant.AdaptiveServiceRemainingKey, 
l.Remaining())
-       invocation.SetAttachments(constant.AdaptiveServiceInflightKey, 
l.Inflight())
+       invocation.SetAttachments(constant.AdaptiveServiceRemainingKey, 
fmt.Sprintf("%d", l.Remaining()))
+       invocation.SetAttachments(constant.AdaptiveServiceInflightKey, 
fmt.Sprintf("%d", l.Inflight()))
        logger.Debugf("[adasvc filter] The attachments are set, %s: %d, %s: 
%d.",
                constant.AdaptiveServiceRemainingKey, l.Remaining(),
                constant.AdaptiveServiceInflightKey, l.Inflight())

Reply via email to