This is an automated email from the ASF dual-hosted git repository.

justxuewei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 55a06394e Fix/sentinel filter (#2621)
55a06394e is described below

commit 55a06394edb544cb0a54b8b697da3ae34b4f014c
Author: 不插电 <[email protected]>
AuthorDate: Fri Mar 22 15:45:42 2024 +0800

    Fix/sentinel filter (#2621)
    
    * fix: the bug of sentinel.TraceError and sentinel.Exit not being called, 
and sentinel.InitDefault() not being called
    
    * test: add test sentinel filter error count
    
    * fix: The problem of sentinel.TraceError not being called in 
sentinelConsumerFilter.
    
    * style: adjust code style.
---
 filter/sentinel/filter.go      | 56 +++++++++++++++++++--------------
 filter/sentinel/filter_test.go | 71 ++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 103 insertions(+), 24 deletions(-)

diff --git a/filter/sentinel/filter.go b/filter/sentinel/filter.go
index 009e77dca..5b2d8cecd 100644
--- a/filter/sentinel/filter.go
+++ b/filter/sentinel/filter.go
@@ -52,6 +52,10 @@ func init() {
        }
 }
 
+var (
+       initOnce sync.Once
+)
+
 type DubboLoggerWrapper struct {
        logger.Logger
 }
@@ -88,19 +92,6 @@ func (d DubboLoggerWrapper) ErrorEnabled() bool {
        return true
 }
 
-func sentinelExit(ctx context.Context, result protocol.Result) {
-       if methodEntry := ctx.Value(MethodEntryKey); methodEntry != nil {
-               e := methodEntry.(*base.SentinelEntry)
-               sentinel.TraceError(e, result.Error())
-               e.Exit()
-       }
-       if interfaceEntry := ctx.Value(InterfaceEntryKey); interfaceEntry != 
nil {
-               e := interfaceEntry.(*base.SentinelEntry)
-               sentinel.TraceError(e, result.Error())
-               e.Exit()
-       }
-}
-
 var (
        providerOnce     sync.Once
        sentinelProvider *sentinelProviderFilter
@@ -110,6 +101,11 @@ type sentinelProviderFilter struct{}
 
 func newSentinelProviderFilter() filter.Filter {
        if sentinelProvider == nil {
+               initOnce.Do(func() {
+                       if err := sentinel.InitDefault(); err != nil {
+                               panic(err)
+                       }
+               })
                providerOnce.Do(func() {
                        sentinelProvider = &sentinelProviderFilter{}
                })
@@ -130,7 +126,7 @@ func (d *sentinelProviderFilter) Invoke(ctx 
context.Context, invoker protocol.In
                // interface blocked
                return sentinelDubboProviderFallback(ctx, invoker, invocation, 
b)
        }
-       ctx = context.WithValue(ctx, InterfaceEntryKey, interfaceEntry)
+       defer interfaceEntry.Exit()
 
        methodEntry, b = sentinel.Entry(methodResourceName,
                sentinel.WithResourceType(base.ResTypeRPC),
@@ -140,12 +136,18 @@ func (d *sentinelProviderFilter) Invoke(ctx 
context.Context, invoker protocol.In
                // method blocked
                return sentinelDubboProviderFallback(ctx, invoker, invocation, 
b)
        }
-       ctx = context.WithValue(ctx, MethodEntryKey, methodEntry)
-       return invoker.Invoke(ctx, invocation)
+       defer methodEntry.Exit()
+
+       result := invoker.Invoke(ctx, invocation)
+       if result.Error() != nil {
+               sentinel.TraceError(interfaceEntry, result.Error())
+               sentinel.TraceError(methodEntry, result.Error())
+       }
+
+       return result
 }
 
 func (d *sentinelProviderFilter) OnResponse(ctx context.Context, result 
protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
-       sentinelExit(ctx, result)
        return result
 }
 
@@ -158,6 +160,11 @@ type sentinelConsumerFilter struct{}
 
 func newSentinelConsumerFilter() filter.Filter {
        if sentinelConsumer == nil {
+               initOnce.Do(func() {
+                       if err := sentinel.InitDefault(); err != nil {
+                               panic(err)
+                       }
+               })
                consumerOnce.Do(func() {
                        sentinelConsumer = &sentinelConsumerFilter{}
                })
@@ -178,7 +185,7 @@ func (d *sentinelConsumerFilter) Invoke(ctx 
context.Context, invoker protocol.In
                // interface blocked
                return sentinelDubboConsumerFallback(ctx, invoker, invocation, 
b)
        }
-       ctx = context.WithValue(ctx, InterfaceEntryKey, interfaceEntry)
+       defer interfaceEntry.Exit()
 
        methodEntry, b = sentinel.Entry(methodResourceName, 
sentinel.WithResourceType(base.ResTypeRPC),
                sentinel.WithTrafficType(base.Outbound), 
sentinel.WithArgs(invocation.Arguments()...))
@@ -186,13 +193,17 @@ func (d *sentinelConsumerFilter) Invoke(ctx 
context.Context, invoker protocol.In
                // method blocked
                return sentinelDubboConsumerFallback(ctx, invoker, invocation, 
b)
        }
-       ctx = context.WithValue(ctx, MethodEntryKey, methodEntry)
+       defer methodEntry.Exit()
 
-       return invoker.Invoke(ctx, invocation)
+       result := invoker.Invoke(ctx, invocation)
+       if result.Error() != nil {
+               sentinel.TraceError(interfaceEntry, result.Error())
+               sentinel.TraceError(methodEntry, result.Error())
+       }
+       return result
 }
 
 func (d *sentinelConsumerFilter) OnResponse(ctx context.Context, result 
protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
-       sentinelExit(ctx, result)
        return result
 }
 
@@ -220,9 +231,6 @@ func getDefaultDubboFallback() DubboFallback {
 const (
        DefaultProviderPrefix = "dubbo:provider:"
        DefaultConsumerPrefix = "dubbo:consumer:"
-
-       MethodEntryKey    = constant.DubboCtxKey("$$sentinelMethodEntry")
-       InterfaceEntryKey = constant.DubboCtxKey("$$sentinelInterfaceEntry")
 )
 
 func getResourceName(invoker protocol.Invoker, invocation protocol.Invocation, 
prefix string) (interfaceResourceName, methodResourceName string) {
diff --git a/filter/sentinel/filter_test.go b/filter/sentinel/filter_test.go
index 36d9fa730..39ecb922f 100644
--- a/filter/sentinel/filter_test.go
+++ b/filter/sentinel/filter_test.go
@@ -19,9 +19,12 @@ package sentinel
 
 import (
        "context"
+       "github.com/alibaba/sentinel-golang/core/circuitbreaker"
+       "github.com/pkg/errors"
        "sync"
        "sync/atomic"
        "testing"
+       "time"
 )
 
 import (
@@ -85,6 +88,74 @@ func TestSentinelFilter_QPS(t *testing.T) {
        assert.True(t, atomic.LoadInt64(&block) <= 205 && 
atomic.LoadInt64(&block) >= 195)
 }
 
+type ErrInvoker struct {
+       *protocol.BaseInvoker
+}
+
+func (ei *ErrInvoker) Invoke(context context.Context, invocation 
protocol.Invocation) protocol.Result {
+       invoke := ei.BaseInvoker.Invoke(context, invocation)
+       invoke.SetError(errors.New("error"))
+       return invoke
+}
+
+type stateChangeTestListener struct {
+       OnTransformToOpenChan chan struct{}
+}
+
+func (s *stateChangeTestListener) OnTransformToClosed(prev 
circuitbreaker.State, rule circuitbreaker.Rule) {
+}
+
+func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, 
rule circuitbreaker.Rule, snapshot interface{}) {
+       s.OnTransformToOpenChan <- struct{}{}
+}
+
+func (s *stateChangeTestListener) OnTransformToHalfOpen(prev 
circuitbreaker.State, rule circuitbreaker.Rule) {
+}
+
+func TestSentinelFilter_ErrorCount(t *testing.T) {
+       url, err := 
common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +
+               "version=1.0.0&group=myGroup&" +
+               
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"
 +
+               
"environment=dev&interface=com.test.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"
 +
+               
"module=dubbogo+user-info+server&org=test.com&owner=ZX&pid=1447&revision=0.0.1&"
 +
+               
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
+       assert.NoError(t, err)
+       mockInvoker := &ErrInvoker{protocol.NewBaseInvoker(url)}
+       _, methodResourceName := getResourceName(mockInvoker,
+               invocation.NewRPCInvocation("hi", []interface{}{"OK"}, 
make(map[string]interface{})), DefaultProviderPrefix)
+       mockInvocation := invocation.NewRPCInvocation("hi", 
[]interface{}{"OK"}, make(map[string]interface{}))
+
+       // Register a state change listener so that we could observe the state 
change of the internal circuit breaker.
+       listener := &stateChangeTestListener{}
+       listener.OnTransformToOpenChan = make(chan struct{}, 1)
+       circuitbreaker.RegisterStateChangeListeners(listener)
+       _, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
+               // Statistic time span=0.9s, recoveryTimeout=3s, 
maxErrorCount=50
+               {
+                       Resource:                     methodResourceName,
+                       Strategy:                     circuitbreaker.ErrorCount,
+                       RetryTimeoutMs:               3000,
+                       MinRequestAmount:             10,
+                       StatIntervalMs:               900,
+                       StatSlidingWindowBucketCount: 10,
+                       Threshold:                    50,
+               },
+       })
+       assert.NoError(t, err)
+
+       f := &sentinelProviderFilter{}
+       for i := 0; i < 50; i++ {
+               result := f.Invoke(context.TODO(), mockInvoker, mockInvocation)
+               assert.Error(t, result.Error())
+       }
+       select {
+       case <-time.After(time.Second):
+               t.Error()
+       case <-listener.OnTransformToOpenChan:
+       }
+
+}
+
 func TestConsumerFilter_Invoke(t *testing.T) {
        f := &sentinelConsumerFilter{}
        url, err := 
common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +

Reply via email to