This is an automated email from the ASF dual-hosted git repository.
justxuewei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/main by this push:
new 55a06394e Fix/sentinel filter (#2621)
55a06394e is described below
commit 55a06394edb544cb0a54b8b697da3ae34b4f014c
Author: 不插电 <[email protected]>
AuthorDate: Fri Mar 22 15:45:42 2024 +0800
Fix/sentinel filter (#2621)
* fix: the bug of sentinel.TraceError and sentinel.Exit not being called,
and sentinel.InitDefault() not being called
* test: add test sentinel filter error count
* fix: The problem of sentinel.TraceError not being called in
sentinelConsumerFilter.
* style: adjust code style.
---
filter/sentinel/filter.go | 56 +++++++++++++++++++--------------
filter/sentinel/filter_test.go | 71 ++++++++++++++++++++++++++++++++++++++++++
2 files changed, 103 insertions(+), 24 deletions(-)
diff --git a/filter/sentinel/filter.go b/filter/sentinel/filter.go
index 009e77dca..5b2d8cecd 100644
--- a/filter/sentinel/filter.go
+++ b/filter/sentinel/filter.go
@@ -52,6 +52,10 @@ func init() {
}
}
+var (
+ initOnce sync.Once
+)
+
type DubboLoggerWrapper struct {
logger.Logger
}
@@ -88,19 +92,6 @@ func (d DubboLoggerWrapper) ErrorEnabled() bool {
return true
}
-func sentinelExit(ctx context.Context, result protocol.Result) {
- if methodEntry := ctx.Value(MethodEntryKey); methodEntry != nil {
- e := methodEntry.(*base.SentinelEntry)
- sentinel.TraceError(e, result.Error())
- e.Exit()
- }
- if interfaceEntry := ctx.Value(InterfaceEntryKey); interfaceEntry !=
nil {
- e := interfaceEntry.(*base.SentinelEntry)
- sentinel.TraceError(e, result.Error())
- e.Exit()
- }
-}
-
var (
providerOnce sync.Once
sentinelProvider *sentinelProviderFilter
@@ -110,6 +101,11 @@ type sentinelProviderFilter struct{}
func newSentinelProviderFilter() filter.Filter {
if sentinelProvider == nil {
+ initOnce.Do(func() {
+ if err := sentinel.InitDefault(); err != nil {
+ panic(err)
+ }
+ })
providerOnce.Do(func() {
sentinelProvider = &sentinelProviderFilter{}
})
@@ -130,7 +126,7 @@ func (d *sentinelProviderFilter) Invoke(ctx
context.Context, invoker protocol.In
// interface blocked
return sentinelDubboProviderFallback(ctx, invoker, invocation,
b)
}
- ctx = context.WithValue(ctx, InterfaceEntryKey, interfaceEntry)
+ defer interfaceEntry.Exit()
methodEntry, b = sentinel.Entry(methodResourceName,
sentinel.WithResourceType(base.ResTypeRPC),
@@ -140,12 +136,18 @@ func (d *sentinelProviderFilter) Invoke(ctx
context.Context, invoker protocol.In
// method blocked
return sentinelDubboProviderFallback(ctx, invoker, invocation,
b)
}
- ctx = context.WithValue(ctx, MethodEntryKey, methodEntry)
- return invoker.Invoke(ctx, invocation)
+ defer methodEntry.Exit()
+
+ result := invoker.Invoke(ctx, invocation)
+ if result.Error() != nil {
+ sentinel.TraceError(interfaceEntry, result.Error())
+ sentinel.TraceError(methodEntry, result.Error())
+ }
+
+ return result
}
func (d *sentinelProviderFilter) OnResponse(ctx context.Context, result
protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
- sentinelExit(ctx, result)
return result
}
@@ -158,6 +160,11 @@ type sentinelConsumerFilter struct{}
func newSentinelConsumerFilter() filter.Filter {
if sentinelConsumer == nil {
+ initOnce.Do(func() {
+ if err := sentinel.InitDefault(); err != nil {
+ panic(err)
+ }
+ })
consumerOnce.Do(func() {
sentinelConsumer = &sentinelConsumerFilter{}
})
@@ -178,7 +185,7 @@ func (d *sentinelConsumerFilter) Invoke(ctx
context.Context, invoker protocol.In
// interface blocked
return sentinelDubboConsumerFallback(ctx, invoker, invocation,
b)
}
- ctx = context.WithValue(ctx, InterfaceEntryKey, interfaceEntry)
+ defer interfaceEntry.Exit()
methodEntry, b = sentinel.Entry(methodResourceName,
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Outbound),
sentinel.WithArgs(invocation.Arguments()...))
@@ -186,13 +193,17 @@ func (d *sentinelConsumerFilter) Invoke(ctx
context.Context, invoker protocol.In
// method blocked
return sentinelDubboConsumerFallback(ctx, invoker, invocation,
b)
}
- ctx = context.WithValue(ctx, MethodEntryKey, methodEntry)
+ defer methodEntry.Exit()
- return invoker.Invoke(ctx, invocation)
+ result := invoker.Invoke(ctx, invocation)
+ if result.Error() != nil {
+ sentinel.TraceError(interfaceEntry, result.Error())
+ sentinel.TraceError(methodEntry, result.Error())
+ }
+ return result
}
func (d *sentinelConsumerFilter) OnResponse(ctx context.Context, result
protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
- sentinelExit(ctx, result)
return result
}
@@ -220,9 +231,6 @@ func getDefaultDubboFallback() DubboFallback {
const (
DefaultProviderPrefix = "dubbo:provider:"
DefaultConsumerPrefix = "dubbo:consumer:"
-
- MethodEntryKey = constant.DubboCtxKey("$$sentinelMethodEntry")
- InterfaceEntryKey = constant.DubboCtxKey("$$sentinelInterfaceEntry")
)
func getResourceName(invoker protocol.Invoker, invocation protocol.Invocation,
prefix string) (interfaceResourceName, methodResourceName string) {
diff --git a/filter/sentinel/filter_test.go b/filter/sentinel/filter_test.go
index 36d9fa730..39ecb922f 100644
--- a/filter/sentinel/filter_test.go
+++ b/filter/sentinel/filter_test.go
@@ -19,9 +19,12 @@ package sentinel
import (
"context"
+ "github.com/alibaba/sentinel-golang/core/circuitbreaker"
+ "github.com/pkg/errors"
"sync"
"sync/atomic"
"testing"
+ "time"
)
import (
@@ -85,6 +88,74 @@ func TestSentinelFilter_QPS(t *testing.T) {
assert.True(t, atomic.LoadInt64(&block) <= 205 &&
atomic.LoadInt64(&block) >= 195)
}
+type ErrInvoker struct {
+ *protocol.BaseInvoker
+}
+
+func (ei *ErrInvoker) Invoke(context context.Context, invocation
protocol.Invocation) protocol.Result {
+ invoke := ei.BaseInvoker.Invoke(context, invocation)
+ invoke.SetError(errors.New("error"))
+ return invoke
+}
+
+type stateChangeTestListener struct {
+ OnTransformToOpenChan chan struct{}
+}
+
+func (s *stateChangeTestListener) OnTransformToClosed(prev
circuitbreaker.State, rule circuitbreaker.Rule) {
+}
+
+func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State,
rule circuitbreaker.Rule, snapshot interface{}) {
+ s.OnTransformToOpenChan <- struct{}{}
+}
+
+func (s *stateChangeTestListener) OnTransformToHalfOpen(prev
circuitbreaker.State, rule circuitbreaker.Rule) {
+}
+
+func TestSentinelFilter_ErrorCount(t *testing.T) {
+ url, err :=
common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +
+ "version=1.0.0&group=myGroup&" +
+
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"
+
+
"environment=dev&interface=com.test.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"
+
+
"module=dubbogo+user-info+server&org=test.com&owner=ZX&pid=1447&revision=0.0.1&"
+
+
"side=provider&timeout=3000×tamp=1556509797245&bean.name=UserProvider")
+ assert.NoError(t, err)
+ mockInvoker := &ErrInvoker{protocol.NewBaseInvoker(url)}
+ _, methodResourceName := getResourceName(mockInvoker,
+ invocation.NewRPCInvocation("hi", []interface{}{"OK"},
make(map[string]interface{})), DefaultProviderPrefix)
+ mockInvocation := invocation.NewRPCInvocation("hi",
[]interface{}{"OK"}, make(map[string]interface{}))
+
+ // Register a state change listener so that we could observe the state
change of the internal circuit breaker.
+ listener := &stateChangeTestListener{}
+ listener.OnTransformToOpenChan = make(chan struct{}, 1)
+ circuitbreaker.RegisterStateChangeListeners(listener)
+ _, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{
+ // Statistic time span=0.9s, recoveryTimeout=3s,
maxErrorCount=50
+ {
+ Resource: methodResourceName,
+ Strategy: circuitbreaker.ErrorCount,
+ RetryTimeoutMs: 3000,
+ MinRequestAmount: 10,
+ StatIntervalMs: 900,
+ StatSlidingWindowBucketCount: 10,
+ Threshold: 50,
+ },
+ })
+ assert.NoError(t, err)
+
+ f := &sentinelProviderFilter{}
+ for i := 0; i < 50; i++ {
+ result := f.Invoke(context.TODO(), mockInvoker, mockInvocation)
+ assert.Error(t, result.Error())
+ }
+ select {
+ case <-time.After(time.Second):
+ t.Error()
+ case <-listener.OnTransformToOpenChan:
+ }
+
+}
+
func TestConsumerFilter_Invoke(t *testing.T) {
f := &sentinelConsumerFilter{}
url, err :=
common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +