This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/develop by this push:
new 6efc27c5c fix(filter):solve side effects caused by 3184 refactor
(#3230)
6efc27c5c is described below
commit 6efc27c5ce47c83c2b3a07dc707e17f29504ad83
Author: 花国栋 <[email protected]>
AuthorDate: Wed Mar 4 02:43:45 2026 +0800
fix(filter):solve side effects caused by 3184 refactor (#3230)
* fix(filter):solve side effects caused by 3184 refactor
* fix(filter): switch URL storage from attachment to attribute
---
filter/active/filter.go | 36 +++++++++++++++++++++++++++++++-----
filter/active/filter_test.go | 32 ++++++++++++++++++--------------
2 files changed, 49 insertions(+), 19 deletions(-)
diff --git a/filter/active/filter.go b/filter/active/filter.go
index ab070fbf5..7e6c17e63 100644
--- a/filter/active/filter.go
+++ b/filter/active/filter.go
@@ -28,6 +28,7 @@ import (
)
import (
+ "dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/filter"
@@ -38,6 +39,7 @@ import (
const (
dubboInvokeStartTime = "dubboInvokeStartTime"
+ dubboInvokeURL = "dubboInvokeURL"
)
var (
@@ -63,22 +65,46 @@ func newActiveFilter() filter.Filter {
// Invoke starts to record the requests status
func (f *activeFilter) Invoke(ctx context.Context, invoker base.Invoker, inv
base.Invocation) result.Result {
- inv.(*invocation.RPCInvocation).SetAttachment(dubboInvokeStartTime,
strconv.FormatInt(base.CurrentTimeMillis(), 10))
- base.BeginCount(invoker.GetURL(), inv.MethodName())
+ rpcInv := inv.(*invocation.RPCInvocation)
+
+ // Record the start time
+ rpcInv.SetAttachment(dubboInvokeStartTime,
strconv.FormatInt(base.CurrentTimeMillis(), 10))
+
+ // Cache the URL object to ensure BeginCount and EndCount use the same
URL instance
+ // This prevents statistics inconsistency when the invoker is destroyed
concurrently
+ url := invoker.GetURL()
+ rpcInv.SetAttribute(dubboInvokeURL, url)
+ base.BeginCount(url, inv.MethodName())
return invoker.Invoke(ctx, inv)
}
// OnResponse update the active count base on the request result.
func (f *activeFilter) OnResponse(ctx context.Context, result result.Result,
invoker base.Invoker, inv base.Invocation) result.Result {
- startTime, err :=
strconv.ParseInt(inv.(*invocation.RPCInvocation).GetAttachmentWithDefaultValue(dubboInvokeStartTime,
"0"), 10, 64)
+ rpcInv := inv.(*invocation.RPCInvocation)
+
+ /// Retrieve the cached URL from Invoke phase with a nil default value
+ rawUrl := rpcInv.GetAttributeWithDefaultValue(dubboInvokeURL, nil)
+
+ // Safely assert the type. If rawUrl is nil, ok will be false and panic
is avoided.
+ url, ok := rawUrl.(*common.URL)
+
+ // If the URL is missing or invalid, it means the Invoke phase was
likely interrupted.
+ // Skip EndCount to prevent statistic inconsistency or panics.
+ if !ok || url == nil {
+ logger.Warnf("activeFilter cannot get cached URL from
attribute, skip EndCount. Invoker may not have passed Invoke phase.")
+ return result
+ }
+
+ startTime, err :=
strconv.ParseInt(rpcInv.GetAttachmentWithDefaultValue(dubboInvokeStartTime,
"0"), 10, 64)
if err != nil {
result.SetError(err)
logger.Errorf("parse dubbo_invoke_start_time to int64 failed")
// When err is not nil, use default elapsed value of 1
- base.EndCount(invoker.GetURL(), inv.MethodName(), 1, false)
+ base.EndCount(url, inv.MethodName(), 1, false)
return result
}
+
elapsed := base.CurrentTimeMillis() - startTime
- base.EndCount(invoker.GetURL(), inv.MethodName(), elapsed,
result.Error() == nil)
+ base.EndCount(url, inv.MethodName(), elapsed, result.Error() == nil)
return result
}
diff --git a/filter/active/filter_test.go b/filter/active/filter_test.go
index f70bee173..b0a34b804 100644
--- a/filter/active/filter_test.go
+++ b/filter/active/filter_test.go
@@ -52,20 +52,24 @@ func TestFilterInvoke(t *testing.T) {
invoker.EXPECT().GetURL().Return(url).Times(1)
filter.Invoke(context.Background(), invoker, invoc)
assert.NotEmpty(t,
invoc.GetAttachmentWithDefaultValue(dubboInvokeStartTime, ""))
+ urlFromAttribute, ok := invoc.GetAttribute(dubboInvokeURL)
+ assert.True(t, ok, "dubboInvokeURL should be cached in attribute")
+ assert.Equal(t, url.Key(), urlFromAttribute.(*common.URL).Key(),
"Cached URL should match original URL")
+
}
func TestFilterOnResponse(t *testing.T) {
c := base.CurrentTimeMillis()
elapsed := 100
+ url, _ :=
common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.alibaba.user.UserProvider",
constant.LocalHostValue, constant.DefaultPort))
invoc := invocation.NewRPCInvocation("test", []any{"OK"},
map[string]any{
dubboInvokeStartTime: strconv.FormatInt(c-int64(elapsed), 10),
})
- url, _ :=
common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider",
constant.LocalHostValue, constant.DefaultPort))
+ invoc.SetAttribute(dubboInvokeURL, url)
filter := activeFilter{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
- invoker.EXPECT().GetURL().Return(url).Times(1)
rpcResult := &result.RPCResult{
Err: errors.New("test"),
}
@@ -88,20 +92,20 @@ func TestFilterOnResponse(t *testing.T) {
func TestFilterOnResponseWithDefer(t *testing.T) {
base.CleanAllStatus()
- // Test scenario 1: dubboInvokeStartTime is parsed successfully and the
result is correct.
+ // Test scenario 1: dubboInvokeStartTime is parsed successfully and
result is correct.
t.Run("ParseSuccessAndResultSuccess", func(t *testing.T) {
defer base.CleanAllStatus()
c := base.CurrentTimeMillis()
+ url, _ :=
common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.alibaba.user.UserProvider",
constant.LocalHostValue, constant.DefaultPort))
invoc := invocation.NewRPCInvocation("test1", []any{"OK"},
map[string]any{
dubboInvokeStartTime: strconv.FormatInt(c, 10),
})
- url, _ :=
common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider",
constant.LocalHostValue, constant.DefaultPort))
+ invoc.SetAttribute(dubboInvokeURL, url)
filter := activeFilter{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
- invoker.EXPECT().GetURL().Return(url).Times(1)
rpcResult := &result.RPCResult{}
filter.OnResponse(context.TODO(), rpcResult, invoker, invoc)
@@ -119,20 +123,20 @@ func TestFilterOnResponseWithDefer(t *testing.T) {
assert.GreaterOrEqual(t, urlStatus.GetTotalElapsed(), int64(0))
})
- // Test scenario 2: dubboInvokeStartTime is parsed successfully, but
the result is incorrect
+ // Test scenario 2: dubboInvokeStartTime is parsed successfully, but
result is incorrect
t.Run("ParseSuccessAndResultFailed", func(t *testing.T) {
defer base.CleanAllStatus()
c := base.CurrentTimeMillis()
+ url, _ :=
common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider",
constant.LocalHostValue, constant.DefaultPort))
invoc := invocation.NewRPCInvocation("test2", []any{"OK"},
map[string]any{
dubboInvokeStartTime: strconv.FormatInt(c, 10),
})
- url, _ :=
common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider",
constant.LocalHostValue, constant.DefaultPort))
+ invoc.SetAttribute(dubboInvokeURL, url)
filter := activeFilter{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
- invoker.EXPECT().GetURL().Return(url).Times(1)
rpcResult := &result.RPCResult{
Err: errors.New("test error"),
}
@@ -158,15 +162,15 @@ func TestFilterOnResponseWithDefer(t *testing.T) {
t.Run("ParseFailedWithInvalidString", func(t *testing.T) {
defer base.CleanAllStatus()
+ url, _ :=
common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider",
constant.LocalHostValue, constant.DefaultPort))
invoc := invocation.NewRPCInvocation("test3", []any{"OK"},
map[string]any{
dubboInvokeStartTime: "invalid-time",
})
- url, _ :=
common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider",
constant.LocalHostValue, constant.DefaultPort))
+ invoc.SetAttribute(dubboInvokeURL, url)
filter := activeFilter{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
- invoker.EXPECT().GetURL().Return(url).Times(1)
rpcResult := &result.RPCResult{}
result := filter.OnResponse(context.TODO(), rpcResult, invoker,
invoc)
@@ -175,7 +179,7 @@ func TestFilterOnResponseWithDefer(t *testing.T) {
methodStatus := base.GetMethodStatus(url, "test3")
urlStatus := base.GetURLStatus(url)
- // Verification count and status - should use the default
duration of 1 and be marked as failed
+ // Verification count and status - should use default duration
of 1 and be marked as failed
assert.Equal(t, int32(1), methodStatus.GetTotal())
assert.Equal(t, int32(1), urlStatus.GetTotal())
assert.Equal(t, int32(1), methodStatus.GetFailed())
@@ -186,17 +190,17 @@ func TestFilterOnResponseWithDefer(t *testing.T) {
assert.GreaterOrEqual(t, urlStatus.GetFailedElapsed(), int64(1))
})
- // Test scenario 4: dubboInvokeStartTime does not exist (use the
default value 0)
+ // Test scenario 4: dubboInvokeStartTime does not exist (use default
value 0)
t.Run("ParseFailedWithDefaultValue", func(t *testing.T) {
defer base.CleanAllStatus()
- invoc := invocation.NewRPCInvocation("test4", []any{"OK"},
make(map[string]any))
url, _ :=
common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider",
constant.LocalHostValue, constant.DefaultPort))
+ invoc := invocation.NewRPCInvocation("test4", []any{"OK"},
map[string]any{})
+ invoc.SetAttribute(dubboInvokeURL, url)
filter := activeFilter{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
- invoker.EXPECT().GetURL().Return(url).Times(1)
rpcResult := &result.RPCResult{}
filter.OnResponse(context.TODO(), rpcResult, invoker, invoc)