This is an automated email from the ASF dual-hosted git repository.
victory pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/develop by this push:
new 49c005a Ftr: [refer dubbo 2.7.6] attachment type from
map[string]stiring to map[string]interface{} (#713)
49c005a is described below
commit 49c005ab8f01f7e929a502d2a1309ddfcd61cda1
Author: cvictory <[email protected]>
AuthorDate: Mon Sep 7 10:54:46 2020 +0800
Ftr: [refer dubbo 2.7.6] attachment type from map[string]stiring to
map[string]interface{} (#713)
* support attachment from map[sting]string to map[string]interface{} in
invocation and result
---
.../zone_aware_cluster_invoker_test.go | 8 +-
cluster/router/tag/tag_router.go | 6 +-
common/proxy/proxy.go | 7 +-
common/proxy/proxy_test.go | 34 ++
filter/filter_impl/access_log_filter.go | 28 +-
filter/filter_impl/access_log_filter_test.go | 4 +-
filter/filter_impl/active_filter_test.go | 4 +-
.../filter_impl/auth/default_authenticator_test.go | 6 +-
filter/filter_impl/auth/provider_auth_test.go | 2 +-
filter/filter_impl/execute_limit_filter_test.go | 6 +-
.../filter_impl/graceful_shutdown_filter_test.go | 2 +-
filter/filter_impl/metrics_filter_test.go | 2 +-
filter/filter_impl/seata_filter_test.go | 7 +-
filter/filter_impl/token_filter.go | 2 +-
filter/filter_impl/token_filter_test.go | 8 +-
.../tps/tps_limiter_method_service_test.go | 8 +-
filter/filter_impl/tps_limit_filter_test.go | 6 +-
filter/filter_impl/tracing_filter_test.go | 2 +-
go.mod | 2 +-
go.sum | 10 +-
metadata/service/inmemory/service_proxy.go | 2 +-
metrics/prometheus/reporter_test.go | 2 +-
protocol/dubbo/client.go | 20 +-
protocol/dubbo/client_test.go | 32 +-
protocol/dubbo/codec.go | 21 +-
protocol/dubbo/codec_test.go | 17 +-
protocol/dubbo/dubbo_invoker.go | 3 +-
protocol/dubbo/dubbo_invoker_test.go | 2 +-
protocol/dubbo/hessian2/const.go | 243 +++++++++++++
protocol/dubbo/hessian2/hessian_dubbo.go | 251 ++++++++++++++
protocol/dubbo/hessian2/hessian_dubbo_test.go | 231 +++++++++++++
protocol/dubbo/hessian2/hessian_request.go | 350 +++++++++++++++++++
protocol/dubbo/hessian2/hessian_request_test.go | 158 +++++++++
protocol/dubbo/hessian2/hessian_response.go | 377 +++++++++++++++++++++
protocol/dubbo/hessian2/hessian_response_test.go | 225 ++++++++++++
protocol/dubbo/listener.go | 48 ++-
protocol/dubbo/listener_test.go | 5 +-
protocol/dubbo/opentracing.go | 60 ++++
protocol/dubbo/readwriter.go | 34 +-
protocol/invocation.go | 7 +-
protocol/invocation/rpcinvocation.go | 28 +-
protocol/jsonrpc/server.go | 2 +-
protocol/rest/server/rest_server.go | 2 +-
protocol/result.go | 19 +-
test/integrate/dubbo/go-client/go.mod | 4 +
test/integrate/dubbo/go-server/go.mod | 4 +
46 files changed, 2151 insertions(+), 150 deletions(-)
diff --git a/cluster/cluster_impl/zone_aware_cluster_invoker_test.go
b/cluster/cluster_impl/zone_aware_cluster_invoker_test.go
index cd201a4..7f77f33 100644
--- a/cluster/cluster_impl/zone_aware_cluster_invoker_test.go
+++ b/cluster/cluster_impl/zone_aware_cluster_invoker_test.go
@@ -44,7 +44,7 @@ func TestZoneWareInvokerWithPreferredSuccess(t *testing.T) {
//defer ctrl.Finish()
mockResult := &protocol.RPCResult{
- Attrs: map[string]string{constant.PREFERRED_KEY: "true"},
+ Attrs: map[string]interface{}{constant.PREFERRED_KEY: "true"},
Rest: rest{tried: 0, success: true}}
var invokers []protocol.Invoker
@@ -99,7 +99,7 @@ func TestZoneWareInvokerWithWeightSuccess(t *testing.T) {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation)
protocol.Result {
return &protocol.RPCResult{
- Attrs:
map[string]string{constant.WEIGHT_KEY: w1},
+ Attrs:
map[string]interface{}{constant.WEIGHT_KEY: w1},
Rest: rest{tried: 0, success:
true}}
}).MaxTimes(100)
} else {
@@ -107,7 +107,7 @@ func TestZoneWareInvokerWithWeightSuccess(t *testing.T) {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation)
protocol.Result {
return &protocol.RPCResult{
- Attrs:
map[string]string{constant.WEIGHT_KEY: w2},
+ Attrs:
map[string]interface{}{constant.WEIGHT_KEY: w2},
Rest: rest{tried: 0, success:
true}}
}).MaxTimes(100)
}
@@ -154,7 +154,7 @@ func TestZoneWareInvokerWithZoneSuccess(t *testing.T) {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(
func(invocation protocol.Invocation) protocol.Result {
return &protocol.RPCResult{
- Attrs:
map[string]string{constant.ZONE_KEY: zoneValue},
+ Attrs:
map[string]interface{}{constant.ZONE_KEY: zoneValue},
Rest: rest{tried: 0, success: true}}
})
invokers = append(invokers, invoker)
diff --git a/cluster/router/tag/tag_router.go b/cluster/router/tag/tag_router.go
index 12e6eda..9f92e83 100644
--- a/cluster/router/tag/tag_router.go
+++ b/cluster/router/tag/tag_router.go
@@ -361,7 +361,9 @@ func isAnyHost(addr string) bool {
func findTag(invocation protocol.Invocation, consumerUrl *common.URL) string {
tag, ok := invocation.Attachments()[constant.Tagkey]
if !ok {
- tag = consumerUrl.GetParam(constant.Tagkey, "")
+ return consumerUrl.GetParam(constant.Tagkey, "")
+ } else if v, t := tag.(string); t {
+ return v
}
- return tag
+ return ""
}
diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go
index ce0f4d1..d51ce1c 100644
--- a/common/proxy/proxy.go
+++ b/common/proxy/proxy.go
@@ -145,12 +145,17 @@ func (p *Proxy) Implement(v common.RPCService) {
inv.SetAttachments(k, value)
}
- // add user setAttachment
+ // add user setAttachment. It is compatibility with
previous versions.
atm := invCtx.Value(constant.AttachmentKey)
if m, ok := atm.(map[string]string); ok {
for k, value := range m {
inv.SetAttachments(k, value)
}
+ } else if m2, ok2 := atm.(map[string]interface{}); ok2 {
+ // it is support to transfer
map[string]interface{}. It refers to dubbo-java 2.7.
+ for k, value := range m2 {
+ inv.SetAttachments(k, value)
+ }
}
result := p.invoke.Invoke(invCtx, inv)
diff --git a/common/proxy/proxy_test.go b/common/proxy/proxy_test.go
index 14b2bef..c606615 100644
--- a/common/proxy/proxy_test.go
+++ b/common/proxy/proxy_test.go
@@ -24,6 +24,7 @@ import (
)
import (
+ "github.com/apache/dubbo-go/protocol/invocation"
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
@@ -32,6 +33,7 @@ import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
+ "github.com/apache/dubbo-go/protocol/dubbo/hessian2"
)
type TestService struct {
@@ -40,6 +42,7 @@ type TestService struct {
MethodThree func(int, bool) (interface{}, error)
MethodFour func(int, bool) (*interface{}, error) `dubbo:"methodFour"`
MethodFive func() error
+ MethodSix func(context.Context, string) (interface{}, error)
Echo func(interface{}, *interface{}) error
}
@@ -120,3 +123,34 @@ func TestProxyImplement(t *testing.T) {
assert.Nil(t, s3.MethodOne)
}
+
+func TestProxyImplementForContext(t *testing.T) {
+ invoker := &TestProxyInvoker{
+ BaseInvoker: *protocol.NewBaseInvoker(common.URL{}),
+ }
+ p := NewProxy(invoker, nil, map[string]string{constant.ASYNC_KEY:
"false"})
+ s := &TestService{}
+ p.Implement(s)
+ attahments1 := make(map[string]interface{}, 4)
+ attahments1["k1"] = "v1"
+ attahments1["k2"] = "v2"
+ context := context.WithValue(context.Background(),
constant.AttachmentKey, attahments1)
+ r, err := p.Get().(*TestService).MethodSix(context, "xxx")
+ v1 := r.(map[string]interface{})
+ assert.NoError(t, err)
+ assert.Equal(t, v1["TestProxyInvoker"], "TestProxyInvokerValue")
+}
+
+type TestProxyInvoker struct {
+ protocol.BaseInvoker
+}
+
+func (bi *TestProxyInvoker) Invoke(context context.Context, inv
protocol.Invocation) protocol.Result {
+ rpcInv := inv.(*invocation.RPCInvocation)
+ mapV := inv.Attachments()
+ mapV["TestProxyInvoker"] = "TestProxyInvokerValue"
+ hessian2.ReflectResponse(mapV, rpcInv.Reply())
+ return &protocol.RPCResult{
+ Rest: inv.Arguments(),
+ }
+}
diff --git a/filter/filter_impl/access_log_filter.go
b/filter/filter_impl/access_log_filter.go
index 621012c..6eaf9cb 100644
--- a/filter/filter_impl/access_log_filter.go
+++ b/filter/filter_impl/access_log_filter.go
@@ -105,13 +105,27 @@ func (ef *AccessLogFilter) logIntoChannel(accessLogData
AccessLogData) {
func (ef *AccessLogFilter) buildAccessLogData(_ protocol.Invoker, invocation
protocol.Invocation) map[string]string {
dataMap := make(map[string]string, 16)
attachments := invocation.Attachments()
- dataMap[constant.INTERFACE_KEY] = attachments[constant.INTERFACE_KEY]
- dataMap[constant.METHOD_KEY] = invocation.MethodName()
- dataMap[constant.VERSION_KEY] = attachments[constant.VERSION_KEY]
- dataMap[constant.GROUP_KEY] = attachments[constant.GROUP_KEY]
- dataMap[constant.TIMESTAMP_KEY] = time.Now().Format(MessageDateLayout)
- dataMap[constant.LOCAL_ADDR], _ = attachments[constant.LOCAL_ADDR]
- dataMap[constant.REMOTE_ADDR], _ = attachments[constant.REMOTE_ADDR]
+ if v, ok := attachments[constant.INTERFACE_KEY]; ok && v != nil {
+ dataMap[constant.INTERFACE_KEY] = v.(string)
+ }
+ if v, ok := attachments[constant.METHOD_KEY]; ok && v != nil {
+ dataMap[constant.METHOD_KEY] = v.(string)
+ }
+ if v, ok := attachments[constant.VERSION_KEY]; ok && v != nil {
+ dataMap[constant.VERSION_KEY] = v.(string)
+ }
+ if v, ok := attachments[constant.GROUP_KEY]; ok && v != nil {
+ dataMap[constant.GROUP_KEY] = v.(string)
+ }
+ if v, ok := attachments[constant.TIMESTAMP_KEY]; ok && v != nil {
+ dataMap[constant.TIMESTAMP_KEY] = v.(string)
+ }
+ if v, ok := attachments[constant.LOCAL_ADDR]; ok && v != nil {
+ dataMap[constant.LOCAL_ADDR] = v.(string)
+ }
+ if v, ok := attachments[constant.REMOTE_ADDR]; ok && v != nil {
+ dataMap[constant.REMOTE_ADDR] = v.(string)
+ }
if len(invocation.Arguments()) > 0 {
builder := strings.Builder{}
diff --git a/filter/filter_impl/access_log_filter_test.go
b/filter/filter_impl/access_log_filter_test.go
index 55c328c..a3a6151 100644
--- a/filter/filter_impl/access_log_filter_test.go
+++ b/filter/filter_impl/access_log_filter_test.go
@@ -45,7 +45,7 @@ func TestAccessLogFilter_Invoke_Not_Config(t *testing.T) {
"service.filter=echo%2Ctoken%2Caccesslog×tamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100")
invoker := protocol.NewBaseInvoker(url)
- attach := make(map[string]string, 10)
+ attach := make(map[string]interface{}, 10)
inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK",
"Hello"}, attach)
accessLogFilter := GetAccessLogFilter()
@@ -64,7 +64,7 @@ func TestAccessLogFilterInvokeDefaultConfig(t *testing.T) {
"service.filter=echo%2Ctoken%2Caccesslog×tamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100")
invoker := protocol.NewBaseInvoker(url)
- attach := make(map[string]string, 10)
+ attach := make(map[string]interface{}, 10)
attach[constant.VERSION_KEY] = "1.0"
attach[constant.GROUP_KEY] = "MyGroup"
inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK",
"Hello"}, attach)
diff --git a/filter/filter_impl/active_filter_test.go
b/filter/filter_impl/active_filter_test.go
index 6b72830..9837a49 100644
--- a/filter/filter_impl/active_filter_test.go
+++ b/filter/filter_impl/active_filter_test.go
@@ -37,7 +37,7 @@ import (
)
func TestActiveFilterInvoke(t *testing.T) {
- invoc := invocation.NewRPCInvocation("test", []interface{}{"OK"},
make(map[string]string, 0))
+ invoc := invocation.NewRPCInvocation("test", []interface{}{"OK"},
make(map[string]interface{}, 0))
url, _ :=
common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
filter := ActiveFilter{}
ctrl := gomock.NewController(t)
@@ -53,7 +53,7 @@ func TestActiveFilterInvoke(t *testing.T) {
func TestActiveFilterOnResponse(t *testing.T) {
c := protocol.CurrentTimeMillis()
elapsed := 100
- invoc := invocation.NewRPCInvocation("test", []interface{}{"OK"},
map[string]string{
+ invoc := invocation.NewRPCInvocation("test", []interface{}{"OK"},
map[string]interface{}{
dubboInvokeStartTime: strconv.FormatInt(c-int64(elapsed), 10),
})
url, _ :=
common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
diff --git a/filter/filter_impl/auth/default_authenticator_test.go
b/filter/filter_impl/auth/default_authenticator_test.go
index 5b107b5..8b0fb6b 100644
--- a/filter/filter_impl/auth/default_authenticator_test.go
+++ b/filter/filter_impl/auth/default_authenticator_test.go
@@ -52,7 +52,7 @@ func TestDefaultAuthenticator_Authenticate(t *testing.T) {
var authenticator = &DefaultAuthenticator{}
- invcation := invocation.NewRPCInvocation("test", parmas,
map[string]string{
+ invcation := invocation.NewRPCInvocation("test", parmas,
map[string]interface{}{
constant.REQUEST_SIGNATURE_KEY: signature,
constant.CONSUMER: "test",
constant.REQUEST_TIMESTAMP_KEY: requestTime,
@@ -61,7 +61,7 @@ func TestDefaultAuthenticator_Authenticate(t *testing.T) {
err := authenticator.Authenticate(invcation, &testurl)
assert.Nil(t, err)
// modify the params
- invcation = invocation.NewRPCInvocation("test", parmas[:1],
map[string]string{
+ invcation = invocation.NewRPCInvocation("test", parmas[:1],
map[string]interface{}{
constant.REQUEST_SIGNATURE_KEY: signature,
constant.CONSUMER: "test",
constant.REQUEST_TIMESTAMP_KEY: requestTime,
@@ -119,7 +119,7 @@ func Test_getAccessKeyPairFailed(t *testing.T) {
func Test_getSignatureWithinParams(t *testing.T) {
testurl, _ :=
common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=gg&version=2.6.0")
testurl.SetParam(constant.PARAMTER_SIGNATURE_ENABLE_KEY, "true")
- inv := invocation.NewRPCInvocation("test", []interface{}{"OK"},
map[string]string{
+ inv := invocation.NewRPCInvocation("test", []interface{}{"OK"},
map[string]interface{}{
"": "",
})
secret := "dubbo"
diff --git a/filter/filter_impl/auth/provider_auth_test.go
b/filter/filter_impl/auth/provider_auth_test.go
index 626782a..f6ebfcd 100644
--- a/filter/filter_impl/auth/provider_auth_test.go
+++ b/filter/filter_impl/auth/provider_auth_test.go
@@ -54,7 +54,7 @@ func TestProviderAuthFilter_Invoke(t *testing.T) {
requestTime := strconv.Itoa(int(time.Now().Unix() * 1000))
signature, _ := getSignature(&url, inv, secret, requestTime)
- inv = invocation.NewRPCInvocation("test", []interface{}{"OK"},
map[string]string{
+ inv = invocation.NewRPCInvocation("test", []interface{}{"OK"},
map[string]interface{}{
constant.REQUEST_SIGNATURE_KEY: signature,
constant.CONSUMER: "test",
constant.REQUEST_TIMESTAMP_KEY: requestTime,
diff --git a/filter/filter_impl/execute_limit_filter_test.go
b/filter/filter_impl/execute_limit_filter_test.go
index d36d6ed..953f5e1 100644
--- a/filter/filter_impl/execute_limit_filter_test.go
+++ b/filter/filter_impl/execute_limit_filter_test.go
@@ -36,7 +36,7 @@ import (
func TestExecuteLimitFilterInvokeIgnored(t *testing.T) {
methodName := "hello"
- invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"},
make(map[string]string, 0))
+ invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"},
make(map[string]interface{}, 0))
invokeUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
@@ -51,7 +51,7 @@ func TestExecuteLimitFilterInvokeIgnored(t *testing.T) {
func TestExecuteLimitFilterInvokeConfigureError(t *testing.T) {
methodName := "hello1"
- invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"},
make(map[string]string, 0))
+ invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"},
make(map[string]interface{}, 0))
invokeUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
@@ -68,7 +68,7 @@ func TestExecuteLimitFilterInvokeConfigureError(t *testing.T)
{
func TestExecuteLimitFilterInvoke(t *testing.T) {
methodName := "hello1"
- invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"},
make(map[string]string, 0))
+ invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"},
make(map[string]interface{}, 0))
invokeUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
diff --git a/filter/filter_impl/graceful_shutdown_filter_test.go
b/filter/filter_impl/graceful_shutdown_filter_test.go
index 87ac2ea..220ef6f 100644
--- a/filter/filter_impl/graceful_shutdown_filter_test.go
+++ b/filter/filter_impl/graceful_shutdown_filter_test.go
@@ -39,7 +39,7 @@ import (
)
func TestGenericFilterInvoke(t *testing.T) {
- invoc := invocation.NewRPCInvocation("GetUser", []interface{}{"OK"},
make(map[string]string, 0))
+ invoc := invocation.NewRPCInvocation("GetUser", []interface{}{"OK"},
make(map[string]interface{}, 0))
invokeUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}))
diff --git a/filter/filter_impl/metrics_filter_test.go
b/filter/filter_impl/metrics_filter_test.go
index 881106f..ac10d52 100644
--- a/filter/filter_impl/metrics_filter_test.go
+++ b/filter/filter_impl/metrics_filter_test.go
@@ -57,7 +57,7 @@ func TestMetricsFilterInvoke(t *testing.T) {
"service.filter=echo%2Ctoken%2Caccesslog×tamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100")
invoker := protocol.NewBaseInvoker(url)
- attach := make(map[string]string, 10)
+ attach := make(map[string]interface{}, 10)
inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK",
"Hello"}, attach)
ctx := context.Background()
diff --git a/filter/filter_impl/seata_filter_test.go
b/filter/filter_impl/seata_filter_test.go
index 6c39897..45817e9 100644
--- a/filter/filter_impl/seata_filter_test.go
+++ b/filter/filter_impl/seata_filter_test.go
@@ -48,8 +48,9 @@ func (iv *testMockSeataInvoker) Invoke(ctx context.Context, _
protocol.Invocatio
func TestSeataFilter_Invoke(t *testing.T) {
filter := getSeataFilter()
- result := filter.Invoke(context.Background(), &testMockSeataInvoker{},
invocation.NewRPCInvocation("$echo", []interface{}{"OK"}, map[string]string{
- SEATA_XID: "10.30.21.227:8091:2000047792",
- }))
+ result := filter.Invoke(context.Background(), &testMockSeataInvoker{},
invocation.NewRPCInvocation("$echo",
+ []interface{}{"OK"}, map[string]interface{}{
+ SEATA_XID: "10.30.21.227:8091:2000047792",
+ }))
assert.Equal(t, "10.30.21.227:8091:2000047792", result.Result())
}
diff --git a/filter/filter_impl/token_filter.go
b/filter/filter_impl/token_filter.go
index fe4e387..b5e0560 100644
--- a/filter/filter_impl/token_filter.go
+++ b/filter/filter_impl/token_filter.go
@@ -51,7 +51,7 @@ func (tf *TokenFilter) Invoke(ctx context.Context, invoker
protocol.Invoker, inv
if len(invokerTkn) > 0 {
attachs := invocation.Attachments()
remoteTkn, exist := attachs[constant.TOKEN_KEY]
- if exist && strings.EqualFold(invokerTkn, remoteTkn) {
+ if exist && remoteTkn != nil && strings.EqualFold(invokerTkn,
remoteTkn.(string)) {
return invoker.Invoke(ctx, invocation)
}
return &protocol.RPCResult{Err: perrors.Errorf("Invalid token!
Forbid invoke remote service %v method %s ",
diff --git a/filter/filter_impl/token_filter_test.go
b/filter/filter_impl/token_filter_test.go
index c2f69bd..cd1bba3 100644
--- a/filter/filter_impl/token_filter_test.go
+++ b/filter/filter_impl/token_filter_test.go
@@ -40,7 +40,7 @@ func TestTokenFilterInvoke(t *testing.T) {
url := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.TOKEN_KEY, "ori_key"))
- attch := make(map[string]string, 0)
+ attch := make(map[string]interface{}, 0)
attch[constant.TOKEN_KEY] = "ori_key"
result := filter.Invoke(context.Background(),
protocol.NewBaseInvoker(*url),
@@ -54,7 +54,7 @@ func TestTokenFilterInvokeEmptyToken(t *testing.T) {
filter := GetTokenFilter()
testUrl := common.URL{}
- attch := make(map[string]string, 0)
+ attch := make(map[string]interface{}, 0)
attch[constant.TOKEN_KEY] = "ori_key"
result := filter.Invoke(context.Background(),
protocol.NewBaseInvoker(testUrl), invocation.NewRPCInvocation("MethodName",
[]interface{}{"OK"}, attch))
assert.Nil(t, result.Error())
@@ -67,7 +67,7 @@ func TestTokenFilterInvokeEmptyAttach(t *testing.T) {
testUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.TOKEN_KEY, "ori_key"))
- attch := make(map[string]string, 0)
+ attch := make(map[string]interface{}, 0)
result := filter.Invoke(context.Background(),
protocol.NewBaseInvoker(*testUrl), invocation.NewRPCInvocation("MethodName",
[]interface{}{"OK"}, attch))
assert.NotNil(t, result.Error())
}
@@ -78,7 +78,7 @@ func TestTokenFilterInvokeNotEqual(t *testing.T) {
testUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.TOKEN_KEY, "ori_key"))
- attch := make(map[string]string, 0)
+ attch := make(map[string]interface{}, 0)
attch[constant.TOKEN_KEY] = "err_key"
result := filter.Invoke(context.Background(),
protocol.NewBaseInvoker(*testUrl),
invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch))
diff --git a/filter/filter_impl/tps/tps_limiter_method_service_test.go
b/filter/filter_impl/tps/tps_limiter_method_service_test.go
index edae99e..61f28e4 100644
--- a/filter/filter_impl/tps/tps_limiter_method_service_test.go
+++ b/filter/filter_impl/tps/tps_limiter_method_service_test.go
@@ -36,7 +36,7 @@ import (
func TestMethodServiceTpsLimiterImplIsAllowableOnlyServiceLevel(t *testing.T) {
methodName := "hello"
- invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"},
make(map[string]string, 0))
+ invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"},
make(map[string]interface{}, 0))
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@@ -63,7 +63,7 @@ func
TestMethodServiceTpsLimiterImplIsAllowableOnlyServiceLevel(t *testing.T) {
func TestMethodServiceTpsLimiterImplIsAllowableNoConfig(t *testing.T) {
methodName := "hello1"
- invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"},
make(map[string]string, 0))
+ invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"},
make(map[string]interface{}, 0))
// ctrl := gomock.NewController(t)
// defer ctrl.Finish()
@@ -80,7 +80,7 @@ func TestMethodServiceTpsLimiterImplIsAllowableNoConfig(t
*testing.T) {
func TestMethodServiceTpsLimiterImplIsAllowableMethodLevelOverride(t
*testing.T) {
methodName := "hello2"
methodConfigPrefix := "methods." + methodName + "."
- invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"},
make(map[string]string, 0))
+ invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"},
make(map[string]interface{}, 0))
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@@ -113,7 +113,7 @@ func
TestMethodServiceTpsLimiterImplIsAllowableMethodLevelOverride(t *testing.T)
func TestMethodServiceTpsLimiterImplIsAllowableBothMethodAndService(t
*testing.T) {
methodName := "hello3"
methodConfigPrefix := "methods." + methodName + "."
- invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"},
make(map[string]string, 0))
+ invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"},
make(map[string]interface{}, 0))
ctrl := gomock.NewController(t)
defer ctrl.Finish()
diff --git a/filter/filter_impl/tps_limit_filter_test.go
b/filter/filter_impl/tps_limit_filter_test.go
index 274e4e6..da0fc48 100644
--- a/filter/filter_impl/tps_limit_filter_test.go
+++ b/filter/filter_impl/tps_limit_filter_test.go
@@ -44,7 +44,7 @@ func TestTpsLimitFilterInvokeWithNoTpsLimiter(t *testing.T) {
invokeUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.TPS_LIMITER_KEY, ""))
- attch := make(map[string]string, 0)
+ attch := make(map[string]interface{}, 0)
result := tpsFilter.Invoke(context.Background(),
protocol.NewBaseInvoker(*invokeUrl),
@@ -68,7 +68,7 @@ func TestGenericFilterInvokeWithDefaultTpsLimiter(t
*testing.T) {
invokeUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.TPS_LIMITER_KEY,
constant.DEFAULT_KEY))
- attch := make(map[string]string, 0)
+ attch := make(map[string]interface{}, 0)
result := tpsFilter.Invoke(context.Background(),
protocol.NewBaseInvoker(*invokeUrl),
@@ -99,7 +99,7 @@ func TestGenericFilterInvokeWithDefaultTpsLimiterNotAllow(t
*testing.T) {
invokeUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.TPS_LIMITER_KEY,
constant.DEFAULT_KEY))
- attch := make(map[string]string, 0)
+ attch := make(map[string]interface{}, 0)
result := tpsFilter.Invoke(context.Background(),
protocol.NewBaseInvoker(*invokeUrl),
invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch))
diff --git a/filter/filter_impl/tracing_filter_test.go
b/filter/filter_impl/tracing_filter_test.go
index 57f4095..e159b74 100644
--- a/filter/filter_impl/tracing_filter_test.go
+++ b/filter/filter_impl/tracing_filter_test.go
@@ -42,7 +42,7 @@ func TestTracingFilterInvoke(t *testing.T) {
"service.filter=echo%2Ctoken%2Caccesslog×tamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100")
invoker := protocol.NewBaseInvoker(url)
- attach := make(map[string]string, 10)
+ attach := make(map[string]interface{}, 10)
inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK",
"Hello"}, attach)
ctx := context.Background()
tf := newTracingFilter()
diff --git a/go.mod b/go.mod
index a26aec1..9e18cc2 100644
--- a/go.mod
+++ b/go.mod
@@ -7,7 +7,7 @@ require (
github.com/Workiva/go-datastructures v1.0.50
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/apache/dubbo-getty v1.3.10
- github.com/apache/dubbo-go-hessian2 v1.6.2
+ github.com/apache/dubbo-go-hessian2
v1.6.0-rc1.0.20200906044240-6c1fb5c3bd44
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.13+incompatible
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f //
indirect
diff --git a/go.sum b/go.sum
index c5ab85d..f744fc3 100644
--- a/go.sum
+++ b/go.sum
@@ -74,8 +74,8 @@ github.com/aliyun/alibaba-cloud-sdk-go v1.61.18
h1:zOVTBdCKFd9JbCKz9/nt+FovbjPFm
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod
h1:v8ESoHo4SyHmuB4b1tJqDHxfTGEciD+yhvOU/5s1Rfk=
github.com/apache/dubbo-getty v1.3.10
h1:ys5mwjPdxG/KwkPjS6EI0RzQtU6p6FCPoKpaFEzpAL0=
github.com/apache/dubbo-getty v1.3.10/go.mod
h1:x6rraK01BL5C7jUM2fPl5KMkAxLVIx54ZB8/XEOik9Y=
-github.com/apache/dubbo-go-hessian2 v1.6.2
h1:i7F5GjVaUatLQz1x9vUmmSIFj49L8J6rVICdF6xw4qw=
-github.com/apache/dubbo-go-hessian2 v1.6.2/go.mod
h1:7rEw9guWABQa6Aqb8HeZcsYPHsOS7XT1qtJvkmI6c5w=
+github.com/apache/dubbo-go-hessian2 v1.6.0-rc1.0.20200906044240-6c1fb5c3bd44
h1:9biQu3Z0PjDN1m8h6poo76dFkvaIpfryUVpJ5VsYVrM=
+github.com/apache/dubbo-go-hessian2
v1.6.0-rc1.0.20200906044240-6c1fb5c3bd44/go.mod
h1:7rEw9guWABQa6Aqb8HeZcsYPHsOS7XT1qtJvkmI6c5w=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e
h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod
h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod
h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
@@ -150,7 +150,6 @@ github.com/docker/go-connections v0.4.0/go.mod
h1:Gbd7IOopHjR8Iph03tsViu4nIes5Xh
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod
h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
github.com/dubbogo/go-zookeeper v1.0.1
h1:irLzvOsDOTNsN8Sv9tvYYxVu6DCQfLtziZQtUHmZgz8=
github.com/dubbogo/go-zookeeper v1.0.1/go.mod
h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
-github.com/dubbogo/gost v1.9.0 h1:UT+dWwvLyJiDotxJERO75jB3Yxgsdy10KztR5ycxRAk=
github.com/dubbogo/gost v1.9.0/go.mod
h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.9.1 h1:0/PPFo13zPbjt4Ia0zYWMFi3C6rAe9X7O1J2Iv+BHNM=
github.com/dubbogo/gost v1.9.1/go.mod
h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
@@ -805,13 +804,11 @@ golang.org/x/tools
v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
-golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5
h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc
h1:NCy3Ohtk6Iny5V/reW2Ktypo4zIpWBdRJ1uFMjBxdg8=
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.4.0/go.mod
h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
-google.golang.org/api v0.7.0 h1:9sdfJOzWlkqPltHAuzT2Cp+yrBeY1KRVYgms8soxMwM=
google.golang.org/api v0.7.0/go.mod
h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
google.golang.org/api v0.8.0/go.mod
h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
google.golang.org/api v0.9.0/go.mod
h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
@@ -820,7 +817,6 @@ google.golang.org/api v0.13.0/go.mod
h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsb
google.golang.org/appengine v1.1.0/go.mod
h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod
h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod
h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
-google.golang.org/appengine v1.6.0
h1:Tfd7cKwKbFRsI8RMAD3oqqw7JPFRrvFlOsfbgVkjOOw=
google.golang.org/appengine v1.6.0/go.mod
h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.6.1
h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I=
google.golang.org/appengine v1.6.1/go.mod
h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
@@ -831,7 +827,6 @@ google.golang.org/genproto
v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRn
google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod
h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod
h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190530194941-fb225487d101/go.mod
h1:z3L6/3dTEVtUr6QSP8miRzeRqwQOioJ9I66odjN4I7s=
-google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64
h1:iKtrH9Y8mcbADOP0YFaEMth7OfuHY9xHOwNj4znpM1A=
google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod
h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod
h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod
h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
@@ -859,7 +854,6 @@ gopkg.in/fsnotify.v1 v1.4.7/go.mod
h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMy
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod
h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
-gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk=
gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
diff --git a/metadata/service/inmemory/service_proxy.go
b/metadata/service/inmemory/service_proxy.go
index 7e01439..e2b2968 100644
--- a/metadata/service/inmemory/service_proxy.go
+++ b/metadata/service/inmemory/service_proxy.go
@@ -55,7 +55,7 @@ func (m *MetadataServiceProxy)
GetExportedURLs(serviceInterface string, group st
inv :=
invocation.NewRPCInvocationWithOptions(invocation.WithMethodName(methodName),
invocation.WithArguments([]interface{}{siV.Interface(),
gV.Interface(), vV.Interface(), pV.Interface()}),
invocation.WithReply(reflect.ValueOf(&[]interface{}{}).Interface()),
-
invocation.WithAttachments(map[string]string{constant.ASYNC_KEY: "false"}),
+
invocation.WithAttachments(map[string]interface{}{constant.ASYNC_KEY: "false"}),
invocation.WithParameterValues([]reflect.Value{siV, gV, vV,
pV}))
res := m.invkr.Invoke(context.Background(), inv)
diff --git a/metrics/prometheus/reporter_test.go
b/metrics/prometheus/reporter_test.go
index 0cb7d09..eaba0e3 100644
--- a/metrics/prometheus/reporter_test.go
+++ b/metrics/prometheus/reporter_test.go
@@ -43,7 +43,7 @@ func TestPrometheusReporter_Report(t *testing.T) {
"service.filter=echo%2Ctoken%2Caccesslog×tamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100")
invoker := protocol.NewBaseInvoker(url)
- attach := make(map[string]string, 10)
+ attach := make(map[string]interface{}, 10)
inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK",
"Hello"}, attach)
assert.False(t, isConsumer(url))
diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go
index 530beba..b6e4618 100644
--- a/protocol/dubbo/client.go
+++ b/protocol/dubbo/client.go
@@ -26,7 +26,6 @@ import (
import (
"github.com/apache/dubbo-getty"
- hessian "github.com/apache/dubbo-go-hessian2"
gxsync "github.com/dubbogo/gost/sync"
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
@@ -38,6 +37,7 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
+ "github.com/apache/dubbo-go/protocol/dubbo/hessian2"
)
var (
@@ -173,11 +173,11 @@ type Request struct {
svcUrl common.URL
method string
args interface{}
- atta map[string]string
+ atta map[string]interface{}
}
// NewRequest create a new Request.
-func NewRequest(addr string, svcUrl common.URL, method string, args
interface{}, atta map[string]string) *Request {
+func NewRequest(addr string, svcUrl common.URL, method string, args
interface{}, atta map[string]interface{}) *Request {
return &Request{
addr: addr,
svcUrl: svcUrl,
@@ -190,11 +190,11 @@ func NewRequest(addr string, svcUrl common.URL, method
string, args interface{},
// Response is dubbo protocol response.
type Response struct {
reply interface{}
- atta map[string]string
+ atta map[string]interface{}
}
// NewResponse creates a new Response.
-func NewResponse(reply interface{}, atta map[string]string) *Response {
+func NewResponse(reply interface{}, atta map[string]interface{}) *Response {
return &Response{
reply: reply,
atta: atta,
@@ -240,16 +240,16 @@ func (c *Client) call(ct CallType, request *Request,
response *Response, callbac
}
p.Header.SerialID = byte(S_Dubbo)
- p.Body = hessian.NewRequest(request.args, request.atta)
+ p.Body = hessian2.NewRequest(request.args, request.atta)
var rsp *PendingResponse
if ct != CT_OneWay {
- p.Header.Type = hessian.PackageRequest_TwoWay
+ p.Header.Type = hessian2.PackageRequest_TwoWay
rsp = NewPendingResponse()
rsp.response = response
rsp.callback = callback
} else {
- p.Header.Type = hessian.PackageRequest
+ p.Header.Type = hessian2.PackageRequest
}
var (
@@ -323,9 +323,9 @@ func (c *Client) transfer(session getty.Session, pkg
*DubboPackage,
if pkg == nil {
pkg = &DubboPackage{}
- pkg.Body = hessian.NewRequest([]interface{}{}, nil)
+ pkg.Body = hessian2.NewRequest([]interface{}{}, nil)
pkg.Body = []interface{}{}
- pkg.Header.Type = hessian.PackageHeartbeat
+ pkg.Header.Type = hessian2.PackageHeartbeat
pkg.Header.SerialID = byte(S_Dubbo)
}
pkg.Header.ID = int64(sequence)
diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go
index 8b0ba16..a3b194a 100644
--- a/protocol/dubbo/client_test.go
+++ b/protocol/dubbo/client_test.go
@@ -20,6 +20,7 @@ package dubbo
import (
"bytes"
"context"
+ "fmt"
"sync"
"testing"
"time"
@@ -129,6 +130,15 @@ func TestClientCall(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, User{Id: "1", Name: ""}, *user)
+ user = &User{}
+ r1 := "v1"
+ r2 := &AttaTestObject{Ti: 45, Desc: "v2"}
+ err = c.Call(NewRequest(mockAddress, url, "GetUserForAttachment",
[]interface{}{1}, map[string]interface{}{"sim": r1, "comp": r2}),
NewResponse(user, nil))
+ assert.NoError(t, err)
+ // the param is transfered from client to server by attachment, and the
server will return the attachment value.
+ // the test should check the value came back from server.
+ assert.Equal(t, User{Id: r1, Name: fmt.Sprintf("%+v", *r2)}, *user)
+
// destroy
proto.Destroy()
}
@@ -166,10 +176,11 @@ func TestClientAsyncCall(t *testing.T) {
func InitTest(t *testing.T) (protocol.Protocol, common.URL) {
hessian.RegisterPOJO(&User{})
+ hessian.RegisterPOJO(&AttaTestObject{})
methods, err :=
common.ServiceMap.Register("com.ikurento.user.UserProvider", "dubbo",
&UserProvider{})
assert.NoError(t, err)
- assert.Equal(t,
"GetBigPkg,GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4,GetUser5,GetUser6",
methods)
+ assert.Equal(t,
"GetBigPkg,GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4,GetUser5,GetUser6,GetUserForAttachment",
methods)
// config
SetClientConf(ClientConfig{
@@ -296,6 +307,16 @@ func (u *UserProvider) GetUser6(id int64) (*User, error) {
return &User{Id: "1"}, nil
}
+func (u *UserProvider) GetUserForAttachment(context context.Context, id int64)
(*User, error) {
+ if id == 0 {
+ return nil, nil
+ }
+ var attachments = context.Value("attachment").(map[string]interface{})
+ Id := attachments["sim"].(string)
+ name := fmt.Sprintf("%+v", *(attachments["comp"].(*AttaTestObject)))
+ return &User{Id: Id, Name: name}, nil
+}
+
func (u *UserProvider) Reference() string {
return "UserProvider"
}
@@ -303,3 +324,12 @@ func (u *UserProvider) Reference() string {
func (u User) JavaClassName() string {
return "com.ikurento.user.User"
}
+
+type AttaTestObject struct {
+ Ti int64
+ Desc string
+}
+
+func (u *AttaTestObject) JavaClassName() string {
+ return "UserProvider"
+}
diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go
index 9781c70..c33c92d 100644
--- a/protocol/dubbo/codec.go
+++ b/protocol/dubbo/codec.go
@@ -25,11 +25,14 @@ import (
)
import (
- "github.com/apache/dubbo-go-hessian2"
"github.com/apache/dubbo-go/common"
perrors "github.com/pkg/errors"
)
+import (
+ "github.com/apache/dubbo-go/protocol/dubbo/hessian2"
+)
+
//SerialID serial ID
type SerialID byte
@@ -59,8 +62,8 @@ type SequenceType int64
// nolint
type DubboPackage struct {
- Header hessian.DubboHeader
- Service hessian.Service
+ Header hessian2.DubboHeader
+ Service hessian2.Service
Body interface{}
Err error
}
@@ -72,7 +75,7 @@ func (p DubboPackage) String() string {
// Marshal encode hessian package.
func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
- codec := hessian.NewHessianCodec(nil)
+ codec := hessian2.NewHessianCodec(nil)
pkg, err := codec.Write(p.Service, p.Header, p.Body)
if err != nil {
@@ -86,11 +89,11 @@ func (p *DubboPackage) Marshal() (*bytes.Buffer, error) {
func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error
{
// fix issue https://github.com/apache/dubbo-go/issues/380
bufLen := buf.Len()
- if bufLen < hessian.HEADER_LENGTH {
- return perrors.WithStack(hessian.ErrHeaderNotEnough)
+ if bufLen < hessian2.HEADER_LENGTH {
+ return perrors.WithStack(hessian2.ErrHeaderNotEnough)
}
- codec := hessian.NewHessianCodec(bufio.NewReaderSize(buf, bufLen))
+ codec := hessian2.NewHessianCodec(bufio.NewReaderSize(buf, bufLen))
// read header
err := codec.ReadHeader(&p.Header)
@@ -104,7 +107,7 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts
...interface{}) error {
return perrors.Errorf("opts[0] is not of type *Client")
}
- if p.Header.Type&hessian.PackageRequest != 0x00 {
+ if p.Header.Type&hessian2.PackageRequest != 0x00 {
// size of this array must be '7'
//
https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272
p.Body = make([]interface{}, 7)
@@ -113,7 +116,7 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts
...interface{}) error {
if !ok {
return
perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
}
- p.Body = &hessian.Response{RspObj:
pendingRsp.(*PendingResponse).response.reply}
+ p.Body = &hessian2.DubboResponse{RspObj:
pendingRsp.(*PendingResponse).response.reply}
}
}
diff --git a/protocol/dubbo/codec_test.go b/protocol/dubbo/codec_test.go
index c2ca443..6dfb87e 100644
--- a/protocol/dubbo/codec_test.go
+++ b/protocol/dubbo/codec_test.go
@@ -24,15 +24,18 @@ import (
)
import (
- hessian "github.com/apache/dubbo-go-hessian2"
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
+import (
+ "github.com/apache/dubbo-go/protocol/dubbo/hessian2"
+)
+
func TestDubboPackageMarshalAndUnmarshal(t *testing.T) {
pkg := &DubboPackage{}
pkg.Body = []interface{}{"a"}
- pkg.Header.Type = hessian.PackageHeartbeat
+ pkg.Header.Type = hessian2.PackageHeartbeat
pkg.Header.SerialID = byte(S_Dubbo)
pkg.Header.ID = 10086
@@ -44,13 +47,13 @@ func TestDubboPackageMarshalAndUnmarshal(t *testing.T) {
pkgres.Body = []interface{}{}
err = pkgres.Unmarshal(data)
assert.NoError(t, err)
- assert.Equal(t,
hessian.PackageHeartbeat|hessian.PackageRequest|hessian.PackageRequest_TwoWay,
pkgres.Header.Type)
+ assert.Equal(t,
hessian2.PackageHeartbeat|hessian2.PackageRequest|hessian2.PackageRequest_TwoWay,
pkgres.Header.Type)
assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID)
assert.Equal(t, int64(10086), pkgres.Header.ID)
assert.Equal(t, 0, len(pkgres.Body.([]interface{})))
// request
- pkg.Header.Type = hessian.PackageRequest
+ pkg.Header.Type = hessian2.PackageRequest
pkg.Service.Interface = "Service"
pkg.Service.Path = "path"
pkg.Service.Version = "2.6"
@@ -63,7 +66,7 @@ func TestDubboPackageMarshalAndUnmarshal(t *testing.T) {
pkgres.Body = make([]interface{}, 7)
err = pkgres.Unmarshal(data)
assert.NoError(t, err)
- assert.Equal(t, hessian.PackageRequest, pkgres.Header.Type)
+ assert.Equal(t, hessian2.PackageRequest, pkgres.Header.Type)
assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID)
assert.Equal(t, int64(10086), pkgres.Header.ID)
assert.Equal(t, "2.0.2", pkgres.Body.([]interface{})[0])
@@ -72,12 +75,12 @@ func TestDubboPackageMarshalAndUnmarshal(t *testing.T) {
assert.Equal(t, "Method", pkgres.Body.([]interface{})[3])
assert.Equal(t, "Ljava/lang/String;", pkgres.Body.([]interface{})[4])
assert.Equal(t, []interface{}{"a"}, pkgres.Body.([]interface{})[5])
- assert.Equal(t, map[string]string{"dubbo": "2.0.2", "interface":
"Service", "path": "path", "timeout": "1000", "version": "2.6"},
pkgres.Body.([]interface{})[6])
+ assert.Equal(t, map[string]interface{}{"dubbo": "2.0.2", "interface":
"Service", "path": "path", "timeout": "1000", "version": "2.6"},
pkgres.Body.([]interface{})[6])
}
func TestIssue380(t *testing.T) {
pkg := &DubboPackage{}
buf := bytes.NewBuffer([]byte("hello"))
err := pkg.Unmarshal(buf)
- assert.True(t, perrors.Cause(err) == hessian.ErrHeaderNotEnough)
+ assert.True(t, perrors.Cause(err) == hessian2.ErrHeaderNotEnough)
}
diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go
index 59202d5..983a05d 100644
--- a/protocol/dubbo/dubbo_invoker.go
+++ b/protocol/dubbo/dubbo_invoker.go
@@ -150,8 +150,7 @@ func (di *DubboInvoker) appendCtx(ctx context.Context, inv
*invocation_impl.RPCI
// inject opentracing ctx
currentSpan := opentracing.SpanFromContext(ctx)
if currentSpan != nil {
- carrier := opentracing.TextMapCarrier(inv.Attachments())
- err := opentracing.GlobalTracer().Inject(currentSpan.Context(),
opentracing.TextMap, carrier)
+ err := injectTraceCtx(currentSpan, inv)
if err != nil {
logger.Errorf("Could not inject the span context into
attachments: %v", err)
}
diff --git a/protocol/dubbo/dubbo_invoker_test.go
b/protocol/dubbo/dubbo_invoker_test.go
index c0640d5..bf352c0 100644
--- a/protocol/dubbo/dubbo_invoker_test.go
+++ b/protocol/dubbo/dubbo_invoker_test.go
@@ -52,7 +52,7 @@ func TestDubboInvokerInvoke(t *testing.T) {
user := &User{}
inv :=
invocation.NewRPCInvocationWithOptions(invocation.WithMethodName(mockMethodNameGetUser),
invocation.WithArguments([]interface{}{"1", "username"}),
- invocation.WithReply(user),
invocation.WithAttachments(map[string]string{"test_key": "test_value"}))
+ invocation.WithReply(user),
invocation.WithAttachments(map[string]interface{}{"test_key": "test_value"}))
// Call
res := invoker.Invoke(context.Background(), inv)
diff --git a/protocol/dubbo/hessian2/const.go b/protocol/dubbo/hessian2/const.go
new file mode 100644
index 0000000..74a00b6
--- /dev/null
+++ b/protocol/dubbo/hessian2/const.go
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This constants are also defined in dubbo constants. But we will still used
these until hessian is stable.
+
+package hessian2
+
+import (
+ "reflect"
+ "regexp"
+)
+
+import (
+ perrors "github.com/pkg/errors"
+)
+
+const (
+ mask = byte(127)
+ flag = byte(128)
+)
+
+const (
+ // Zero : byte zero
+ Zero = byte(0x00)
+)
+
+// constansts
+const (
+ TAG_READ = int32(-1)
+ ASCII_GAP = 32
+ CHUNK_SIZE = 4096
+ BC_BINARY = byte('B') // final chunk
+ BC_BINARY_CHUNK = byte('A') // non-final chunk
+
+ BC_BINARY_DIRECT = byte(0x20) // 1-byte length binary
+ BINARY_DIRECT_MAX = byte(0x0f)
+ BC_BINARY_SHORT = byte(0x34) // 2-byte length binary
+ BINARY_SHORT_MAX = 0x3ff // 0-1023 binary
+
+ BC_DATE = byte(0x4a) // 64-bit millisecond UTC date
+ BC_DATE_MINUTE = byte(0x4b) // 32-bit minute UTC date
+
+ BC_DOUBLE = byte('D') // IEEE 64-bit double
+
+ BC_DOUBLE_ZERO = byte(0x5b)
+ BC_DOUBLE_ONE = byte(0x5c)
+ BC_DOUBLE_BYTE = byte(0x5d)
+ BC_DOUBLE_SHORT = byte(0x5e)
+ BC_DOUBLE_MILL = byte(0x5f)
+
+ BC_FALSE = byte('F') // boolean false
+
+ BC_INT = byte('I') // 32-bit int
+
+ INT_DIRECT_MIN = -0x10
+ INT_DIRECT_MAX = byte(0x2f)
+ BC_INT_ZERO = byte(0x90)
+
+ INT_BYTE_MIN = -0x800
+ INT_BYTE_MAX = 0x7ff
+ BC_INT_BYTE_ZERO = byte(0xc8)
+
+ BC_END = byte('Z')
+
+ INT_SHORT_MIN = -0x40000
+ INT_SHORT_MAX = 0x3ffff
+ BC_INT_SHORT_ZERO = byte(0xd4)
+
+ BC_LIST_VARIABLE = byte(0x55)
+ BC_LIST_FIXED = byte('V')
+ BC_LIST_VARIABLE_UNTYPED = byte(0x57)
+ BC_LIST_FIXED_UNTYPED = byte(0x58)
+ _listFixedTypedLenTagMin = byte(0x70)
+ _listFixedTypedLenTagMax = byte(0x77)
+ _listFixedUntypedLenTagMin = byte(0x78)
+ _listFixedUntypedLenTagMax = byte(0x7f)
+
+ BC_LIST_DIRECT = byte(0x70)
+ BC_LIST_DIRECT_UNTYPED = byte(0x78)
+ LIST_DIRECT_MAX = byte(0x7)
+
+ BC_LONG = byte('L') // 64-bit signed integer
+ LONG_DIRECT_MIN = -0x08
+ LONG_DIRECT_MAX = byte(0x0f)
+ BC_LONG_ZERO = byte(0xe0)
+
+ LONG_BYTE_MIN = -0x800
+ LONG_BYTE_MAX = 0x7ff
+ BC_LONG_BYTE_ZERO = byte(0xf8)
+
+ LONG_SHORT_MIN = -0x40000
+ LONG_SHORT_MAX = 0x3ffff
+ BC_LONG_SHORT_ZERO = byte(0x3c)
+
+ BC_LONG_INT = byte(0x59)
+
+ BC_MAP = byte('M')
+ BC_MAP_UNTYPED = byte('H')
+
+ BC_NULL = byte('N') // x4e
+
+ BC_OBJECT = byte('O')
+ BC_OBJECT_DEF = byte('C')
+
+ BC_OBJECT_DIRECT = byte(0x60)
+ OBJECT_DIRECT_MAX = byte(0x0f)
+
+ BC_REF = byte(0x51)
+
+ BC_STRING = byte('S') // final string
+ BC_STRING_CHUNK = byte('R') // non-final string
+
+ BC_STRING_DIRECT = byte(0x00)
+ STRING_DIRECT_MAX = byte(0x1f)
+ BC_STRING_SHORT = byte(0x30)
+ STRING_SHORT_MAX = 0x3ff
+
+ BC_TRUE = byte('T')
+
+ P_PACKET_CHUNK = byte(0x4f)
+ P_PACKET = byte('P')
+
+ P_PACKET_DIRECT = byte(0x80)
+ PACKET_DIRECT_MAX = byte(0x7f)
+
+ P_PACKET_SHORT = byte(0x70)
+ PACKET_SHORT_MAX = 0xfff
+ ARRAY_STRING = "[string"
+ ARRAY_INT = "[int"
+ ARRAY_DOUBLE = "[double"
+ ARRAY_FLOAT = "[float"
+ ARRAY_BOOL = "[boolean"
+ ARRAY_LONG = "[long"
+
+ PATH_KEY = "path"
+ GROUP_KEY = "group"
+ INTERFACE_KEY = "interface"
+ VERSION_KEY = "version"
+ TIMEOUT_KEY = "timeout"
+
+ STRING_NIL = ""
+ STRING_TRUE = "true"
+ STRING_FALSE = "false"
+ STRING_ZERO = "0.0"
+ STRING_ONE = "1.0"
+)
+
+// DubboResponse related consts
+const (
+ Response_OK byte = 20
+ Response_CLIENT_TIMEOUT byte = 30
+ Response_SERVER_TIMEOUT byte = 31
+ Response_BAD_REQUEST byte = 40
+ Response_BAD_RESPONSE byte = 50
+ Response_SERVICE_NOT_FOUND byte = 60
+ Response_SERVICE_ERROR byte = 70
+ Response_SERVER_ERROR byte = 80
+ Response_CLIENT_ERROR byte = 90
+
+ // According to "java dubbo" There are two cases of response:
+ // 1. with attachments
+ // 2. no attachments
+ RESPONSE_WITH_EXCEPTION int32 = 0
+ RESPONSE_VALUE int32 = 1
+ RESPONSE_NULL_VALUE int32 = 2
+ RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS int32 = 3
+ RESPONSE_VALUE_WITH_ATTACHMENTS int32 = 4
+ RESPONSE_NULL_VALUE_WITH_ATTACHMENTS int32 = 5
+)
+
+/**
+ * the dubbo protocol header length is 16 Bytes.
+ * the first 2 Bytes is magic code '0xdabb'
+ * the next 1 Byte is message flags, in which its 16-20 bit is serial id, 21
for event, 22 for two way, 23 for request/response flag
+ * the next 1 Bytes is response state.
+ * the next 8 Bytes is package DI.
+ * the next 4 Bytes is package length.
+ **/
+const (
+ // header length.
+ HEADER_LENGTH = 16
+
+ // magic header
+ MAGIC = uint16(0xdabb)
+ MAGIC_HIGH = byte(0xda)
+ MAGIC_LOW = byte(0xbb)
+
+ // message flag.
+ FLAG_REQUEST = byte(0x80)
+ FLAG_TWOWAY = byte(0x40)
+ FLAG_EVENT = byte(0x20) // for heartbeat
+ SERIAL_MASK = 0x1f
+
+ DUBBO_VERSION = "2.5.4"
+ DUBBO_VERSION_KEY = "dubbo"
+ DEFAULT_DUBBO_PROTOCOL_VERSION = "2.0.2" // Dubbo RPC protocol
version, for compatibility, it must not be between 2.0.10 ~ 2.6.2
+ LOWEST_VERSION_FOR_RESPONSE_ATTACHMENT = 2000200
+ DEFAULT_LEN = 8388608 // 8 * 1024 * 1024
default body max length
+)
+
+// regular
+const (
+ JAVA_IDENT_REGEX = "(?:[_$a-zA-Z][_$a-zA-Z0-9]*)"
+ CLASS_DESC = "(?:L" + JAVA_IDENT_REGEX + "(?:\\/" +
JAVA_IDENT_REGEX + ")*;)"
+ ARRAY_DESC = "(?:\\[+(?:(?:[VZBCDFIJS])|" + CLASS_DESC + "))"
+ DESC_REGEX = "(?:(?:[VZBCDFIJS])|" + CLASS_DESC + "|" +
ARRAY_DESC + ")"
+)
+
+// Dubbo request response related consts
+var (
+ DubboRequestHeaderBytesTwoWay = [HEADER_LENGTH]byte{MAGIC_HIGH,
MAGIC_LOW, FLAG_REQUEST | FLAG_TWOWAY}
+ DubboRequestHeaderBytes = [HEADER_LENGTH]byte{MAGIC_HIGH,
MAGIC_LOW, FLAG_REQUEST}
+ DubboResponseHeaderBytes = [HEADER_LENGTH]byte{MAGIC_HIGH,
MAGIC_LOW, Zero, Response_OK}
+ DubboRequestHeartbeatHeader = [HEADER_LENGTH]byte{MAGIC_HIGH,
MAGIC_LOW, FLAG_REQUEST | FLAG_TWOWAY | FLAG_EVENT}
+ DubboResponseHeartbeatHeader = [HEADER_LENGTH]byte{MAGIC_HIGH,
MAGIC_LOW, FLAG_EVENT}
+)
+
+// Error part
+var (
+ ErrHeaderNotEnough = perrors.New("header buffer too short")
+ ErrBodyNotEnough = perrors.New("body buffer too short")
+ ErrJavaException = perrors.New("got java exception")
+ ErrIllegalPackage = perrors.New("illegal package!")
+)
+
+// nolint
+var DescRegex, _ = regexp.Compile(DESC_REGEX)
+
+var NilValue = reflect.Zero(reflect.TypeOf((*interface{})(nil)).Elem())
diff --git a/protocol/dubbo/hessian2/hessian_dubbo.go
b/protocol/dubbo/hessian2/hessian_dubbo.go
new file mode 100644
index 0000000..1afa4ec
--- /dev/null
+++ b/protocol/dubbo/hessian2/hessian_dubbo.go
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package hessian2
+
+import (
+ "bufio"
+ "encoding/binary"
+ "time"
+)
+
+import (
+ "github.com/apache/dubbo-go-hessian2"
+ perrors "github.com/pkg/errors"
+)
+
+// enum part
+const (
+ PackageError = PackageType(0x01)
+ PackageRequest = PackageType(0x02)
+ PackageResponse = PackageType(0x04)
+ PackageHeartbeat = PackageType(0x08)
+ PackageRequest_TwoWay = PackageType(0x10)
+ PackageResponse_Exception = PackageType(0x20)
+ PackageType_BitSize = 0x2f
+)
+
+// PackageType nolint
+type PackageType int
+
+// DubboHeader dubbo header
+type DubboHeader struct {
+ SerialID byte
+ Type PackageType
+ ID int64
+ BodyLen int
+ ResponseStatus byte
+}
+
+// Service defines service instance
+type Service struct {
+ Path string
+ Interface string
+ Group string
+ Version string
+ Method string
+ Timeout time.Duration // request timeout
+}
+
+// HessianCodec defines hessian codec
+type HessianCodec struct {
+ pkgType PackageType
+ reader *bufio.Reader
+ bodyLen int
+}
+
+// NewHessianCodec generate a new hessian codec instance
+func NewHessianCodec(reader *bufio.Reader) *HessianCodec {
+ return &HessianCodec{
+ reader: reader,
+ }
+}
+
+// NewHessianCodec generate a new hessian codec instance
+func NewHessianCodecCustom(pkgType PackageType, reader *bufio.Reader, bodyLen
int) *HessianCodec {
+ return &HessianCodec{
+ pkgType: pkgType,
+ reader: reader,
+ bodyLen: bodyLen,
+ }
+}
+
+func (h *HessianCodec) Write(service Service, header DubboHeader, body
interface{}) ([]byte, error) {
+ switch header.Type {
+ case PackageHeartbeat:
+ if header.ResponseStatus == Zero {
+ return packRequest(service, header, body)
+ }
+ return packResponse(header, body)
+
+ case PackageRequest, PackageRequest_TwoWay:
+ return packRequest(service, header, body)
+
+ case PackageResponse:
+ return packResponse(header, body)
+
+ default:
+ return nil, perrors.Errorf("Unrecognised message type: %v",
header.Type)
+ }
+
+ // unreachable return nil, nil
+}
+
+// ReadHeader uses hessian codec to read dubbo header
+func (h *HessianCodec) ReadHeader(header *DubboHeader) error {
+
+ var err error
+
+ if h.reader.Size() < HEADER_LENGTH {
+ return ErrHeaderNotEnough
+ }
+ buf, err := h.reader.Peek(HEADER_LENGTH)
+ if err != nil { // this is impossible
+ return perrors.WithStack(err)
+ }
+ _, err = h.reader.Discard(HEADER_LENGTH)
+ if err != nil { // this is impossible
+ return perrors.WithStack(err)
+ }
+
+ //// read header
+
+ if buf[0] != MAGIC_HIGH && buf[1] != MAGIC_LOW {
+ return ErrIllegalPackage
+ }
+
+ // Header{serialization id(5 bit), event, two way, req/response}
+ if header.SerialID = buf[2] & SERIAL_MASK; header.SerialID == Zero {
+ return perrors.Errorf("serialization ID:%v", header.SerialID)
+ }
+
+ flag := buf[2] & FLAG_EVENT
+ if flag != Zero {
+ header.Type |= PackageHeartbeat
+ }
+ flag = buf[2] & FLAG_REQUEST
+ if flag != Zero {
+ header.Type |= PackageRequest
+ flag = buf[2] & FLAG_TWOWAY
+ if flag != Zero {
+ header.Type |= PackageRequest_TwoWay
+ }
+ } else {
+ header.Type |= PackageResponse
+ header.ResponseStatus = buf[3]
+ if header.ResponseStatus != Response_OK {
+ header.Type |= PackageResponse_Exception
+ }
+ }
+
+ // Header{req id}
+ header.ID = int64(binary.BigEndian.Uint64(buf[4:]))
+
+ // Header{body len}
+ header.BodyLen = int(binary.BigEndian.Uint32(buf[12:]))
+ if header.BodyLen < 0 {
+ return ErrIllegalPackage
+ }
+
+ h.pkgType = header.Type
+ h.bodyLen = header.BodyLen
+
+ if h.reader.Buffered() < h.bodyLen {
+ return ErrBodyNotEnough
+ }
+
+ return perrors.WithStack(err)
+
+}
+
+// ReadBody uses hessian codec to read response body
+func (h *HessianCodec) ReadBody(rspObj interface{}) error {
+
+ if h.reader.Buffered() < h.bodyLen {
+ return ErrBodyNotEnough
+ }
+ buf, err := h.reader.Peek(h.bodyLen)
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ _, err = h.reader.Discard(h.bodyLen)
+ if err != nil { // this is impossible
+ return perrors.WithStack(err)
+ }
+
+ switch h.pkgType & PackageType_BitSize {
+ case PackageResponse | PackageHeartbeat | PackageResponse_Exception,
PackageResponse | PackageResponse_Exception:
+ decoder := hessian.NewDecoder(buf[:])
+ exception, err := decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ rsp, ok := rspObj.(*DubboResponse)
+ if !ok {
+ return perrors.Errorf("java exception:%s",
exception.(string))
+ }
+ rsp.Exception = perrors.Errorf("java exception:%s",
exception.(string))
+ return nil
+ case PackageRequest | PackageHeartbeat, PackageResponse |
PackageHeartbeat:
+ case PackageRequest:
+ if rspObj != nil {
+ if err = unpackRequestBody(hessian.NewDecoder(buf[:]),
rspObj); err != nil {
+ return perrors.WithStack(err)
+ }
+ }
+ case PackageResponse:
+ if rspObj != nil {
+ if err = unpackResponseBody(hessian.NewDecoder(buf[:]),
rspObj); err != nil {
+ return perrors.WithStack(err)
+ }
+ }
+ }
+
+ return nil
+}
+
+// ignore body, but only read attachments
+func (h *HessianCodec) ReadAttachments() (map[string]interface{}, error) {
+ if h.reader.Buffered() < h.bodyLen {
+ return nil, ErrBodyNotEnough
+ }
+ buf, err := h.reader.Peek(h.bodyLen)
+ if err != nil {
+ return nil, perrors.WithStack(err)
+ }
+ _, err = h.reader.Discard(h.bodyLen)
+ if err != nil { // this is impossible
+ return nil, perrors.WithStack(err)
+ }
+
+ switch h.pkgType & PackageType_BitSize {
+ case PackageRequest:
+ rspObj := make([]interface{}, 7)
+ if err = unpackRequestBody(hessian.NewDecoderWithSkip(buf[:]),
rspObj); err != nil {
+ return nil, perrors.WithStack(err)
+ }
+ return rspObj[6].(map[string]interface{}), nil
+ case PackageResponse:
+ rspObj := &DubboResponse{}
+ if err = unpackResponseBody(hessian.NewDecoderWithSkip(buf[:]),
rspObj); err != nil {
+ return nil, perrors.WithStack(err)
+ }
+ return rspObj.Attachments, nil
+ }
+
+ return nil, nil
+}
diff --git a/protocol/dubbo/hessian2/hessian_dubbo_test.go
b/protocol/dubbo/hessian2/hessian_dubbo_test.go
new file mode 100644
index 0000000..c3f19f0
--- /dev/null
+++ b/protocol/dubbo/hessian2/hessian_dubbo_test.go
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package hessian2
+
+import (
+ "bufio"
+ "bytes"
+ "reflect"
+ "testing"
+ "time"
+)
+
+import (
+ hessian "github.com/apache/dubbo-go-hessian2"
+ "github.com/stretchr/testify/assert"
+)
+
+type Case struct {
+ A string
+ B int
+}
+
+type CaseA struct {
+ A string
+ B int
+ C Case
+}
+
+type CaseB struct {
+ A string
+ B CaseA
+}
+
+func (c *CaseB) JavaClassName() string {
+ return "com.test.caseb"
+}
+
+func (c CaseA) JavaClassName() string {
+ return "com.test.casea"
+}
+
+//JavaClassName java fully qualified path
+func (c Case) JavaClassName() string {
+ return "com.test.case"
+}
+
+func doTestHessianEncodeHeader(t *testing.T, packageType PackageType,
responseStatus byte, body interface{}) ([]byte, error) {
+ hessian.RegisterPOJO(&Case{})
+ codecW := NewHessianCodec(nil)
+ resp, err := codecW.Write(Service{
+ Path: "test",
+ Interface: "ITest",
+ Version: "v1.0",
+ Method: "test",
+ Timeout: time.Second * 10,
+ }, DubboHeader{
+ SerialID: 2,
+ Type: packageType,
+ ID: 1,
+ ResponseStatus: responseStatus,
+ }, body)
+ assert.Nil(t, err)
+ return resp, err
+}
+
+func doTestResponse(t *testing.T, packageType PackageType, responseStatus
byte, body interface{}, decodedResponse *DubboResponse, assertFunc func()) {
+ resp, err := doTestHessianEncodeHeader(t, packageType, responseStatus,
body)
+
+ codecR := NewHessianCodec(bufio.NewReader(bytes.NewReader(resp)))
+
+ h := &DubboHeader{}
+ err = codecR.ReadHeader(h)
+ assert.Nil(t, err)
+
+ assert.Equal(t, byte(2), h.SerialID)
+ assert.Equal(t, packageType,
h.Type&(PackageRequest|PackageResponse|PackageHeartbeat))
+ assert.Equal(t, int64(1), h.ID)
+ assert.Equal(t, responseStatus, h.ResponseStatus)
+
+ err = codecR.ReadBody(decodedResponse)
+ assert.Nil(t, err)
+ t.Log(decodedResponse)
+
+ if assertFunc != nil {
+ assertFunc()
+ return
+ }
+
+ if h.ResponseStatus != Zero && h.ResponseStatus != Response_OK {
+ assert.Equal(t, "java exception:"+body.(string),
decodedResponse.Exception.Error())
+ return
+ }
+
+ in, _ :=
hessian.EnsureInterface(hessian.UnpackPtrValue(hessian.EnsurePackValue(body)),
nil)
+ out, _ :=
hessian.EnsureInterface(hessian.UnpackPtrValue(hessian.EnsurePackValue(decodedResponse.RspObj)),
nil)
+ assert.Equal(t, in, out)
+}
+
+func TestResponse(t *testing.T) {
+ caseObj := Case{A: "a", B: 1}
+ decodedResponse := &DubboResponse{}
+
+ arr := []*Case{&caseObj}
+ var arrRes []interface{}
+ decodedResponse.RspObj = &arrRes
+ doTestResponse(t, PackageResponse, Response_OK, arr, decodedResponse,
func() {
+ assert.Equal(t, 1, len(arrRes))
+ assert.Equal(t, &caseObj, arrRes[0])
+ })
+
+ decodedResponse.RspObj = &Case{}
+ doTestResponse(t, PackageResponse, Response_OK, &Case{A: "a", B: 1},
decodedResponse, nil)
+
+ s := "ok!!!!!"
+ strObj := ""
+ decodedResponse.RspObj = &strObj
+ doTestResponse(t, PackageResponse, Response_OK, s, decodedResponse, nil)
+
+ var intObj int64
+ decodedResponse.RspObj = &intObj
+ doTestResponse(t, PackageResponse, Response_OK, int64(3),
decodedResponse, nil)
+
+ boolObj := false
+ decodedResponse.RspObj = &boolObj
+ doTestResponse(t, PackageResponse, Response_OK, true, decodedResponse,
nil)
+
+ strObj = ""
+ decodedResponse.RspObj = &strObj
+ doTestResponse(t, PackageResponse, hessian.Response_SERVER_ERROR,
"error!!!!!", decodedResponse, nil)
+
+ mapObj := map[string][]*Case{"key": {&caseObj}}
+ mapRes := map[interface{}]interface{}{}
+ decodedResponse.RspObj = &mapRes
+ doTestResponse(t, PackageResponse, Response_OK, mapObj,
decodedResponse, func() {
+ c, ok := mapRes["key"]
+ if !ok {
+ assert.FailNow(t, "no key in decoded response map")
+ }
+
+ mapValueArr, ok := c.([]*Case)
+ if !ok {
+ assert.FailNow(t, "invalid decoded response map value",
"expect []*Case, but get %v", reflect.TypeOf(c))
+ }
+ assert.Equal(t, 1, len(mapValueArr))
+ assert.Equal(t, &caseObj, mapValueArr[0])
+ })
+}
+
+func doTestRequest(t *testing.T, packageType PackageType, responseStatus byte,
body interface{}) {
+ resp, err := doTestHessianEncodeHeader(t, packageType, responseStatus,
body)
+
+ codecR := NewHessianCodec(bufio.NewReader(bytes.NewReader(resp)))
+
+ h := &DubboHeader{}
+ err = codecR.ReadHeader(h)
+ assert.Nil(t, err)
+ assert.Equal(t, byte(2), h.SerialID)
+ assert.Equal(t, packageType,
h.Type&(PackageRequest|PackageResponse|PackageHeartbeat))
+ assert.Equal(t, int64(1), h.ID)
+ assert.Equal(t, responseStatus, h.ResponseStatus)
+
+ c := make([]interface{}, 7)
+ err = codecR.ReadBody(c)
+ assert.Nil(t, err)
+ t.Log(c)
+ assert.True(t, len(body.([]interface{})) == len(c[5].([]interface{})))
+}
+
+func TestRequest(t *testing.T) {
+ doTestRequest(t, PackageRequest, Zero, []interface{}{"a"})
+ doTestRequest(t, PackageRequest, Zero, []interface{}{"a", 3})
+ doTestRequest(t, PackageRequest, Zero, []interface{}{"a", true})
+ doTestRequest(t, PackageRequest, Zero, []interface{}{"a", 3, true})
+ doTestRequest(t, PackageRequest, Zero, []interface{}{3.2, true})
+ doTestRequest(t, PackageRequest, Zero, []interface{}{"a", 3, true,
&Case{A: "a", B: 3}})
+ doTestRequest(t, PackageRequest, Zero, []interface{}{"a", 3, true,
[]*Case{{A: "a", B: 3}}})
+ doTestRequest(t, PackageRequest, Zero,
[]interface{}{map[string][]*Case{"key": {{A: "a", B: 3}}}})
+}
+
+func TestHessianCodec_ReadAttachments(t *testing.T) {
+ hessian.RegisterPOJO(&AttachTestObject{})
+ body := &DubboResponse{
+ RspObj: &CaseB{A: "A", B: CaseA{A: "a", B: 1, C: Case{A:
"c", B: 2}}},
+ Exception: nil,
+ Attachments: map[string]interface{}{DUBBO_VERSION_KEY: "2.6.4",
"att": AttachTestObject{Id: 23, Name: "haha"}},
+ }
+ resp, err := doTestHessianEncodeHeader(t, PackageResponse, Response_OK,
body)
+ assert.NoError(t, err)
+ hessian.UnRegisterPOJOs(&CaseB{}, &CaseA{})
+ codecR1 := NewHessianCodec(bufio.NewReader(bytes.NewReader(resp)))
+ codecR2 := NewHessianCodec(bufio.NewReader(bytes.NewReader(resp)))
+ h := &DubboHeader{}
+ assert.NoError(t, codecR1.ReadHeader(h))
+ t.Log(h)
+ assert.NoError(t, codecR2.ReadHeader(h))
+ t.Log(h)
+
+ err = codecR1.ReadBody(body)
+ assert.Equal(t, "can not find go type name com.test.caseb in registry",
err.Error())
+ attrs, err := codecR2.ReadAttachments()
+ assert.NoError(t, err)
+ assert.Equal(t, "2.6.4", attrs[DUBBO_VERSION_KEY])
+ assert.Equal(t, AttachTestObject{Id: 23, Name: "haha"},
*(attrs["att"].(*AttachTestObject)))
+ assert.NotEqual(t, AttachTestObject{Id: 24, Name: "nohaha"},
*(attrs["att"].(*AttachTestObject)))
+
+ t.Log(attrs)
+}
+
+type AttachTestObject struct {
+ Id int32
+ Name string `dubbo:name`
+}
+
+func (AttachTestObject) JavaClassName() string {
+ return "com.test.Test"
+}
diff --git a/protocol/dubbo/hessian2/hessian_request.go
b/protocol/dubbo/hessian2/hessian_request.go
new file mode 100644
index 0000000..4ebb4aa
--- /dev/null
+++ b/protocol/dubbo/hessian2/hessian_request.go
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package hessian2
+
+import (
+ "encoding/binary"
+ "reflect"
+ "strconv"
+ "strings"
+ "time"
+)
+
+import (
+ hessian "github.com/apache/dubbo-go-hessian2"
+ perrors "github.com/pkg/errors"
+)
+
+/////////////////////////////////////////
+// dubbo
+/////////////////////////////////////////
+
+func getArgType(v interface{}) string {
+ if v == nil {
+ return "V"
+ }
+
+ switch v.(type) {
+ // Serialized tags for base types
+ case nil:
+ return "V"
+ case bool:
+ return "Z"
+ case []bool:
+ return "[Z"
+ case byte:
+ return "B"
+ case []byte:
+ return "[B"
+ case int8:
+ return "B"
+ case []int8:
+ return "[B"
+ case int16:
+ return "S"
+ case []int16:
+ return "[S"
+ case uint16: // Equivalent to Char of Java
+ return "C"
+ case []uint16:
+ return "[C"
+ // case rune:
+ // return "C"
+ case int:
+ return "J"
+ case []int:
+ return "[J"
+ case int32:
+ return "I"
+ case []int32:
+ return "[I"
+ case int64:
+ return "J"
+ case []int64:
+ return "[J"
+ case time.Time:
+ return "java.util.Date"
+ case []time.Time:
+ return "[Ljava.util.Date"
+ case float32:
+ return "F"
+ case []float32:
+ return "[F"
+ case float64:
+ return "D"
+ case []float64:
+ return "[D"
+ case string:
+ return "java.lang.String"
+ case []string:
+ return "[Ljava.lang.String;"
+ case []hessian.Object:
+ return "[Ljava.lang.Object;"
+ case map[interface{}]interface{}:
+ // return "java.util.HashMap"
+ return "java.util.Map"
+ case hessian.POJOEnum:
+ return v.(hessian.POJOEnum).JavaClassName()
+ // Serialized tags for complex types
+ default:
+ t := reflect.TypeOf(v)
+ if reflect.Ptr == t.Kind() {
+ t = reflect.TypeOf(reflect.ValueOf(v).Elem())
+ }
+ switch t.Kind() {
+ case reflect.Struct:
+ return "java.lang.Object"
+ case reflect.Slice, reflect.Array:
+ if t.Elem().Kind() == reflect.Struct {
+ return "[Ljava.lang.Object;"
+ }
+ // return "java.util.ArrayList"
+ return "java.util.List"
+ case reflect.Map: // Enter here, map may be map[string]int
+ return "java.util.Map"
+ default:
+ return ""
+ }
+ }
+
+ // unreachable
+ // return "java.lang.RuntimeException"
+}
+
+func getArgsTypeList(args []interface{}) (string, error) {
+ var (
+ typ string
+ types string
+ )
+
+ for i := range args {
+ typ = getArgType(args[i])
+ if typ == "" {
+ return types, perrors.Errorf("cat not get arg %#v
type", args[i])
+ }
+ if !strings.Contains(typ, ".") {
+ types += typ
+ } else if strings.Index(typ, "[") == 0 {
+ types += strings.Replace(typ, ".", "/", -1)
+ } else {
+ // java.util.List -> Ljava/util/List;
+ types += "L" + strings.Replace(typ, ".", "/", -1) + ";"
+ }
+ }
+
+ return types, nil
+}
+
+type DubboRequest struct {
+ Params interface{}
+ Attachments map[string]interface{}
+}
+
+// NewRequest create a new DubboRequest
+func NewRequest(params interface{}, atta map[string]interface{}) *DubboRequest
{
+ if atta == nil {
+ atta = make(map[string]interface{})
+ }
+ return &DubboRequest{
+ Params: params,
+ Attachments: atta,
+ }
+}
+
+func EnsureRequest(body interface{}) *DubboRequest {
+ if req, ok := body.(*DubboRequest); ok {
+ return req
+ }
+ return NewRequest(body, nil)
+}
+
+func packRequest(service Service, header DubboHeader, req interface{})
([]byte, error) {
+ var (
+ err error
+ types string
+ byteArray []byte
+ pkgLen int
+ )
+
+ request := EnsureRequest(req)
+
+ args, ok := request.Params.([]interface{})
+ if !ok {
+ return nil, perrors.Errorf("@params is not of type:
[]interface{}")
+ }
+
+ hb := header.Type == PackageHeartbeat
+
+ //////////////////////////////////////////
+ // byteArray
+ //////////////////////////////////////////
+ // magic
+ switch header.Type {
+ case PackageHeartbeat:
+ byteArray = append(byteArray, DubboRequestHeartbeatHeader[:]...)
+ case PackageRequest_TwoWay:
+ byteArray = append(byteArray,
DubboRequestHeaderBytesTwoWay[:]...)
+ default:
+ byteArray = append(byteArray, DubboRequestHeaderBytes[:]...)
+ }
+
+ // serialization id, two way flag, event, request/response flag
+ // SerialID is id of serialization approach in java dubbo
+ byteArray[2] |= header.SerialID & SERIAL_MASK
+ // request id
+ binary.BigEndian.PutUint64(byteArray[4:], uint64(header.ID))
+
+ encoder := hessian.NewEncoder()
+ encoder.Append(byteArray[:HEADER_LENGTH])
+
+ //////////////////////////////////////////
+ // body
+ //////////////////////////////////////////
+ if hb {
+ encoder.Encode(nil)
+ goto END
+ }
+
+ // dubbo version + path + version + method
+ encoder.Encode(DEFAULT_DUBBO_PROTOCOL_VERSION)
+ encoder.Encode(service.Path)
+ encoder.Encode(service.Version)
+ encoder.Encode(service.Method)
+
+ // args = args type list + args value list
+ if types, err = getArgsTypeList(args); err != nil {
+ return nil, perrors.Wrapf(err, " PackRequest(args:%+v)", args)
+ }
+ encoder.Encode(types)
+ for _, v := range args {
+ encoder.Encode(v)
+ }
+
+ request.Attachments[PATH_KEY] = service.Path
+ request.Attachments[VERSION_KEY] = service.Version
+ if len(service.Group) > 0 {
+ request.Attachments[GROUP_KEY] = service.Group
+ }
+ if len(service.Interface) > 0 {
+ request.Attachments[INTERFACE_KEY] = service.Interface
+ }
+ if service.Timeout != 0 {
+ request.Attachments[TIMEOUT_KEY] =
strconv.Itoa(int(service.Timeout / time.Millisecond))
+ }
+
+ encoder.Encode(request.Attachments)
+
+END:
+ byteArray = encoder.Buffer()
+ pkgLen = len(byteArray)
+ if pkgLen > int(DEFAULT_LEN) { // 8M
+ return nil, perrors.Errorf("Data length %d too large, max
payload %d", pkgLen, DEFAULT_LEN)
+ }
+ // byteArray{body length}
+ binary.BigEndian.PutUint32(byteArray[12:], uint32(pkgLen-HEADER_LENGTH))
+ return byteArray, nil
+}
+
+// hessian decode request body
+func unpackRequestBody(decoder *hessian.Decoder, reqObj interface{}) error {
+
+ if decoder == nil {
+ return perrors.Errorf("@decoder is nil")
+ }
+
+ req, ok := reqObj.([]interface{})
+ if !ok {
+ return perrors.Errorf("@reqObj is not of type: []interface{}")
+ }
+ if len(req) < 7 {
+ return perrors.New("length of @reqObj should be 7")
+ }
+
+ var (
+ err error
+ dubboVersion, target, serviceVersion, method, argsTypes
interface{}
+ args
[]interface{}
+ )
+
+ dubboVersion, err = decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ req[0] = dubboVersion
+
+ target, err = decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ req[1] = target
+
+ serviceVersion, err = decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ req[2] = serviceVersion
+
+ method, err = decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ req[3] = method
+
+ argsTypes, err = decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ req[4] = argsTypes
+
+ ats := DescRegex.FindAllString(argsTypes.(string), -1)
+ var arg interface{}
+ for i := 0; i < len(ats); i++ {
+ arg, err = decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ args = append(args, arg)
+ }
+ req[5] = args
+
+ attachments, err := decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ if v, ok := attachments.(map[interface{}]interface{}); ok {
+ v[DUBBO_VERSION_KEY] = dubboVersion
+ req[6] = ToMapStringInterface(v)
+ return nil
+ }
+
+ return perrors.Errorf("get wrong attachments: %+v", attachments)
+}
+
+func ToMapStringInterface(origin map[interface{}]interface{})
map[string]interface{} {
+ dest := make(map[string]interface{}, len(origin))
+ for k, v := range origin {
+ if kv, ok := k.(string); ok {
+ if v == nil {
+ dest[kv] = ""
+ continue
+ }
+ dest[kv] = v
+ }
+ }
+ return dest
+}
diff --git a/protocol/dubbo/hessian2/hessian_request_test.go
b/protocol/dubbo/hessian2/hessian_request_test.go
new file mode 100644
index 0000000..98d5f23
--- /dev/null
+++ b/protocol/dubbo/hessian2/hessian_request_test.go
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package hessian2
+
+import (
+ "reflect"
+ "strconv"
+ "testing"
+ "time"
+)
+
+import (
+ hessian "github.com/apache/dubbo-go-hessian2"
+ "github.com/stretchr/testify/assert"
+)
+
+type TestEnumGender hessian.JavaEnum
+
+const (
+ MAN hessian.JavaEnum = iota
+ WOMAN
+)
+
+var genderName = map[hessian.JavaEnum]string{
+ MAN: "MAN",
+ WOMAN: "WOMAN",
+}
+
+var genderValue = map[string]hessian.JavaEnum{
+ "MAN": MAN,
+ "WOMAN": WOMAN,
+}
+
+func (g TestEnumGender) JavaClassName() string {
+ return "com.ikurento.test.TestEnumGender"
+}
+
+func (g TestEnumGender) String() string {
+ s, ok := genderName[hessian.JavaEnum(g)]
+ if ok {
+ return s
+ }
+
+ return strconv.Itoa(int(g))
+}
+
+func (g TestEnumGender) EnumValue(s string) hessian.JavaEnum {
+ v, ok := genderValue[s]
+ if ok {
+ return v
+ }
+
+ return hessian.InvalidJavaEnum
+}
+
+func TestPackRequest(t *testing.T) {
+ bytes, err := packRequest(Service{
+ Path: "test",
+ Interface: "ITest",
+ Version: "v1.0",
+ Method: "test",
+ Timeout: time.Second * 10,
+ }, DubboHeader{
+ SerialID: 0,
+ Type: PackageRequest,
+ ID: 123,
+ }, []interface{}{1, 2})
+
+ assert.Nil(t, err)
+
+ if bytes != nil {
+ t.Logf("pack request: %s", string(bytes))
+ }
+}
+
+func TestGetArgsTypeList(t *testing.T) {
+ type Test struct{}
+ str, err := getArgsTypeList([]interface{}{nil, 1, []int{2}, true,
[]bool{false}, "a", []string{"b"}, Test{}, &Test{}, []Test{},
map[string]Test{}, TestEnumGender(MAN)})
+ assert.NoError(t, err)
+ assert.Equal(t,
"VJ[JZ[ZLjava/lang/String;[Ljava/lang/String;Ljava/lang/Object;Ljava/lang/Object;[Ljava/lang/Object;Ljava/util/Map;Lcom/ikurento/test/TestEnumGender;",
str)
+}
+
+func TestDescRegex(t *testing.T) {
+ results := DescRegex.FindAllString("Ljava/lang/String;", -1)
+ assert.Equal(t, 1, len(results))
+ assert.Equal(t, "Ljava/lang/String;", results[0])
+
+ results = DescRegex.FindAllString("Ljava/lang/String;I", -1)
+ assert.Equal(t, 2, len(results))
+ assert.Equal(t, "Ljava/lang/String;", results[0])
+ assert.Equal(t, "I", results[1])
+
+ results = DescRegex.FindAllString("ILjava/lang/String;", -1)
+ assert.Equal(t, 2, len(results))
+ assert.Equal(t, "I", results[0])
+ assert.Equal(t, "Ljava/lang/String;", results[1])
+
+ results = DescRegex.FindAllString("ILjava/lang/String;IZ", -1)
+ assert.Equal(t, 4, len(results))
+ assert.Equal(t, "I", results[0])
+ assert.Equal(t, "Ljava/lang/String;", results[1])
+ assert.Equal(t, "I", results[2])
+ assert.Equal(t, "Z", results[3])
+
+ results = DescRegex.FindAllString("[Ljava/lang/String;[I", -1)
+ assert.Equal(t, 2, len(results))
+ assert.Equal(t, "[Ljava/lang/String;", results[0])
+ assert.Equal(t, "[I", results[1])
+}
+
+func TestIssue192(t *testing.T) {
+ type args struct {
+ origin map[interface{}]interface{}
+ }
+ tests := []struct {
+ name string
+ args args
+ want map[string]interface{}
+ }{
+ {
+ name: "not null",
+ args: args{
+ origin: map[interface{}]interface{}{
+ "1": nil,
+ "2": "3",
+ "": "",
+ },
+ },
+ want: map[string]interface{}{
+ "1": "",
+ "2": "3",
+ "": "",
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := ToMapStringInterface(tt.args.origin);
!reflect.DeepEqual(got, tt.want) {
+ t.Errorf("ToMapStringString() = %v, want %v",
got, tt.want)
+ }
+ })
+ }
+}
diff --git a/protocol/dubbo/hessian2/hessian_response.go
b/protocol/dubbo/hessian2/hessian_response.go
new file mode 100644
index 0000000..982960e
--- /dev/null
+++ b/protocol/dubbo/hessian2/hessian_response.go
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package hessian2
+
+import (
+ "encoding/binary"
+ "math"
+ "reflect"
+ "strconv"
+ "strings"
+)
+
+import (
+ hessian "github.com/apache/dubbo-go-hessian2"
+ "github.com/apache/dubbo-go-hessian2/java_exception"
+ perrors "github.com/pkg/errors"
+)
+
+// DubboResponse dubbo response
+type DubboResponse struct {
+ RspObj interface{}
+ Exception error
+ Attachments map[string]interface{}
+}
+
+// NewResponse create a new DubboResponse
+func NewResponse(rspObj interface{}, exception error, attachments
map[string]interface{}) *DubboResponse {
+ if attachments == nil {
+ attachments = make(map[string]interface{}, 8)
+ }
+ return &DubboResponse{
+ RspObj: rspObj,
+ Exception: exception,
+ Attachments: attachments,
+ }
+}
+
+// EnsureResponse check body type, make sure it's a DubboResponse or package
it as a DubboResponse
+func EnsureResponse(body interface{}) *DubboResponse {
+ if res, ok := body.(*DubboResponse); ok {
+ return res
+ }
+ if exp, ok := body.(error); ok {
+ return NewResponse(nil, exp, nil)
+ }
+ return NewResponse(body, nil, nil)
+}
+
+//
https://github.com/apache/dubbo/blob/dubbo-2.7.1/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java#L256
+// hessian encode response
+func packResponse(header DubboHeader, ret interface{}) ([]byte, error) {
+ var (
+ byteArray []byte
+ )
+
+ response := EnsureResponse(ret)
+
+ hb := header.Type == PackageHeartbeat
+
+ // magic
+ if hb {
+ byteArray = append(byteArray,
DubboResponseHeartbeatHeader[:]...)
+ } else {
+ byteArray = append(byteArray, DubboResponseHeaderBytes[:]...)
+ }
+ // set serialID, identify serialization types, eg: fastjson->6,
hessian2->2
+ byteArray[2] |= header.SerialID & SERIAL_MASK
+ // response status
+ if header.ResponseStatus != 0 {
+ byteArray[3] = header.ResponseStatus
+ }
+
+ // request id
+ binary.BigEndian.PutUint64(byteArray[4:], uint64(header.ID))
+
+ // body
+ encoder := hessian.NewEncoder()
+ encoder.Append(byteArray[:HEADER_LENGTH])
+
+ if header.ResponseStatus == Response_OK {
+ if hb {
+ encoder.Encode(nil)
+ } else {
+ atta :=
isSupportResponseAttachment(response.Attachments[DUBBO_VERSION_KEY])
+
+ var resWithException, resValue, resNullValue int32
+ if atta {
+ resWithException =
RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS
+ resValue = RESPONSE_VALUE_WITH_ATTACHMENTS
+ resNullValue =
RESPONSE_NULL_VALUE_WITH_ATTACHMENTS
+ } else {
+ resWithException = RESPONSE_WITH_EXCEPTION
+ resValue = RESPONSE_VALUE
+ resNullValue = RESPONSE_NULL_VALUE
+ }
+
+ if response.Exception != nil { // throw error
+ encoder.Encode(resWithException)
+ if t, ok :=
response.Exception.(java_exception.Throwabler); ok {
+ encoder.Encode(t)
+ } else {
+
encoder.Encode(java_exception.NewThrowable(response.Exception.Error()))
+ }
+ } else {
+ if response.RspObj == nil {
+ encoder.Encode(resNullValue)
+ } else {
+ encoder.Encode(resValue)
+ encoder.Encode(response.RspObj) //
result
+ }
+ }
+
+ if atta {
+ encoder.Encode(response.Attachments) //
attachments
+ }
+ }
+ } else {
+ if response.Exception != nil { // throw error
+ encoder.Encode(response.Exception.Error())
+ } else {
+ encoder.Encode(response.RspObj)
+ }
+ }
+
+ byteArray = encoder.Buffer()
+ byteArray = hessian.EncNull(byteArray) // if not, "java client" will
throw exception "unexpected end of file"
+ pkgLen := len(byteArray)
+ if pkgLen > int(DEFAULT_LEN) { // 8M
+ return nil, perrors.Errorf("Data length %d too large, max
payload %d", pkgLen, DEFAULT_LEN)
+ }
+ // byteArray{body length}
+ binary.BigEndian.PutUint32(byteArray[12:], uint32(pkgLen-HEADER_LENGTH))
+ return byteArray, nil
+
+}
+
+// hessian decode response body
+func unpackResponseBody(decoder *hessian.Decoder, resp interface{}) error {
+ // body
+ if decoder == nil {
+ return perrors.Errorf("@decoder is nil")
+ }
+ rspType, err := decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+
+ response := EnsureResponse(resp)
+
+ switch rspType {
+ case RESPONSE_WITH_EXCEPTION, RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
+ expt, err := decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ if rspType == RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS {
+ attachments, err := decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ if v, ok := attachments.(map[interface{}]interface{});
ok {
+ atta := ToMapStringInterface(v)
+ response.Attachments = atta
+ } else {
+ return perrors.Errorf("get wrong attachments:
%+v", attachments)
+ }
+ }
+
+ if e, ok := expt.(error); ok {
+ response.Exception = e
+ } else {
+ response.Exception = perrors.Errorf("got exception:
%+v", expt)
+ }
+ return nil
+
+ case RESPONSE_VALUE, RESPONSE_VALUE_WITH_ATTACHMENTS:
+ rsp, err := decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ if rspType == RESPONSE_VALUE_WITH_ATTACHMENTS {
+ attachments, err := decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ if v, ok := attachments.(map[interface{}]interface{});
ok {
+ response.Attachments = ToMapStringInterface(v)
+ } else {
+ return perrors.Errorf("get wrong attachments:
%+v", attachments)
+ }
+ }
+
+ // If the return value is nil,
+ // we should consider it normal
+ if rsp == nil {
+ return nil
+ }
+
+ return perrors.WithStack(ReflectResponse(rsp, response.RspObj))
+
+ case RESPONSE_NULL_VALUE, RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
+ if rspType == RESPONSE_NULL_VALUE_WITH_ATTACHMENTS {
+ attachments, err := decoder.Decode()
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ if v, ok := attachments.(map[interface{}]interface{});
ok {
+ atta := ToMapStringInterface(v)
+ response.Attachments = atta
+ } else {
+ return perrors.Errorf("get wrong attachments:
%+v", attachments)
+ }
+ }
+ return nil
+ }
+
+ return nil
+}
+
+// CopySlice copy from inSlice to outSlice
+func CopySlice(inSlice, outSlice reflect.Value) error {
+ if inSlice.IsNil() {
+ return perrors.New("@in is nil")
+ }
+ if inSlice.Kind() != reflect.Slice {
+ return perrors.Errorf("@in is not slice, but %v",
inSlice.Kind())
+ }
+
+ for outSlice.Kind() == reflect.Ptr {
+ outSlice = outSlice.Elem()
+ }
+
+ size := inSlice.Len()
+ outSlice.Set(reflect.MakeSlice(outSlice.Type(), size, size))
+
+ for i := 0; i < size; i++ {
+ inSliceValue := inSlice.Index(i)
+ if !inSliceValue.Type().AssignableTo(outSlice.Index(i).Type()) {
+ return perrors.Errorf("in element type [%s] can not
assign to out element type [%s]",
+ inSliceValue.Type().String(),
outSlice.Type().String())
+ }
+ outSlice.Index(i).Set(inSliceValue)
+ }
+
+ return nil
+}
+
+// CopyMap copy from in map to out map
+func CopyMap(inMapValue, outMapValue reflect.Value) error {
+ if inMapValue.IsNil() {
+ return perrors.New("@in is nil")
+ }
+ if !inMapValue.CanInterface() {
+ return perrors.New("@in's Interface can not be used.")
+ }
+ if inMapValue.Kind() != reflect.Map {
+ return perrors.Errorf("@in is not map, but %v",
inMapValue.Kind())
+ }
+
+ outMapType := hessian.UnpackPtrType(outMapValue.Type())
+ hessian.SetValue(outMapValue, reflect.MakeMap(outMapType))
+
+ outKeyType := outMapType.Key()
+
+ outMapValue = hessian.UnpackPtrValue(outMapValue)
+ outValueType := outMapValue.Type().Elem()
+
+ for _, inKey := range inMapValue.MapKeys() {
+ inValue := inMapValue.MapIndex(inKey)
+
+ if !inKey.Type().AssignableTo(outKeyType) {
+ return perrors.Errorf("in Key:{type:%s, value:%#v} can
not assign to out Key:{type:%s} ",
+ inKey.Type().String(), inKey,
outKeyType.String())
+ }
+ if !inValue.Type().AssignableTo(outValueType) {
+ return perrors.Errorf("in Value:{type:%s, value:%#v}
can not assign to out value:{type:%s}",
+ inValue.Type().String(), inValue,
outValueType.String())
+ }
+ outMapValue.SetMapIndex(inKey, inValue)
+ }
+
+ return nil
+}
+
+// ReflectResponse reflect return value
+// TODO response object should not be copied again to another object, it
should be the exact type of the object
+func ReflectResponse(in interface{}, out interface{}) error {
+ if in == nil {
+ return perrors.Errorf("@in is nil")
+ }
+
+ if out == nil {
+ return perrors.Errorf("@out is nil")
+ }
+ if reflect.TypeOf(out).Kind() != reflect.Ptr {
+ return perrors.Errorf("@out should be a pointer")
+ }
+
+ inValue := hessian.EnsurePackValue(in)
+ outValue := hessian.EnsurePackValue(out)
+
+ outType := outValue.Type().String()
+ if outType == "interface {}" || outType == "*interface {}" {
+ hessian.SetValue(outValue, inValue)
+ return nil
+ }
+
+ switch inValue.Type().Kind() {
+ case reflect.Slice, reflect.Array:
+ return CopySlice(inValue, outValue)
+ case reflect.Map:
+ return CopyMap(inValue, outValue)
+ default:
+ hessian.SetValue(outValue, inValue)
+ }
+
+ return nil
+}
+
+var versionInt = make(map[string]int)
+
+//
https://github.com/apache/dubbo/blob/dubbo-2.7.1/dubbo-common/src/main/java/org/apache/dubbo/common/Version.java#L96
+// isSupportResponseAttachment is for compatibility among some dubbo version
+func isSupportResponseAttachment(ver interface{}) bool {
+ version, ok := ver.(string)
+ if !ok || len(version) == 0 {
+ return false
+ }
+
+ v, ok := versionInt[version]
+ if !ok {
+ v = version2Int(version)
+ if v == -1 {
+ return false
+ }
+ }
+
+ if v >= 2001000 && v <= 2060200 { // 2.0.10 ~ 2.6.2
+ return false
+ }
+ return v >= LOWEST_VERSION_FOR_RESPONSE_ATTACHMENT
+}
+
+func version2Int(ver interface{}) int {
+ version, ok := ver.(string)
+ if !ok || len(version) == 0 {
+ return 0
+ }
+ var v = 0
+ varr := strings.Split(version, ".")
+ length := len(varr)
+ for key, value := range varr {
+ v0, err := strconv.Atoi(value)
+ if err != nil {
+ return -1
+ }
+ v += v0 * int(math.Pow10((length-key-1)*2))
+ }
+ if length == 3 {
+ return v * 100
+ }
+ return v
+}
diff --git a/protocol/dubbo/hessian2/hessian_response_test.go
b/protocol/dubbo/hessian2/hessian_response_test.go
new file mode 100644
index 0000000..f5c84ba
--- /dev/null
+++ b/protocol/dubbo/hessian2/hessian_response_test.go
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package hessian2
+
+import (
+ "reflect"
+ "testing"
+)
+
+import (
+ hessian "github.com/apache/dubbo-go-hessian2"
+ "github.com/stretchr/testify/assert"
+)
+
+func doTestReflectResponse(t *testing.T, in interface{}, out interface{}) {
+ err := ReflectResponse(in, out)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+
+ result := hessian.UnpackPtrValue(reflect.ValueOf(out)).Interface()
+
+ equal := reflect.DeepEqual(in, result)
+ if !equal {
+ t.Errorf("expect [%v]: %v, but got [%v]: %v",
reflect.TypeOf(in), in, reflect.TypeOf(result), result)
+ }
+}
+
+func TestReflectResponse(t *testing.T) {
+ var b bool
+ doTestReflectResponse(t, true, &b)
+ doTestReflectResponse(t, false, &b)
+
+ var i int
+ doTestReflectResponse(t, 123, &i)
+ doTestReflectResponse(t, 234, &i)
+
+ var i16 int16
+ doTestReflectResponse(t, int16(456), &i16)
+
+ var i64 int64
+ doTestReflectResponse(t, int64(789), &i64)
+
+ var s string
+ doTestReflectResponse(t, "hello world", &s)
+
+ type rr struct {
+ Name string
+ Num int
+ }
+
+ var r1 rr
+ doTestReflectResponse(t, rr{"dubbogo", 32}, &r1)
+
+ // ------ map test -------
+ m1 := make(map[interface{}]interface{})
+ var m1r map[interface{}]interface{}
+ m1["hello"] = "world"
+ m1[1] = "go"
+ m1["dubbo"] = 666
+ doTestReflectResponse(t, m1, &m1r)
+
+ m2 := make(map[string]string)
+ var m2r map[string]string
+ m2["hello"] = "world"
+ m2["dubbo"] = "666"
+ doTestReflectResponse(t, m2, &m2r)
+
+ m3 := make(map[string]rr)
+ var m3r map[string]rr
+ m3["dubbo"] = rr{"hello", 123}
+ m3["go"] = rr{"world", 456}
+ doTestReflectResponse(t, m3, &m3r)
+
+ // ------ slice test -------
+ s1 := []string{"abc", "def", "hello", "world"}
+ var s1r []string
+ doTestReflectResponse(t, s1, &s1r)
+
+ s2 := []rr{rr{"dubbo", 666}, rr{"go", 999}}
+ var s2r []rr
+ doTestReflectResponse(t, s2, &s2r)
+
+ s3 := []interface{}{rr{"dubbo", 666}, 123, "hello"}
+ var s3r []interface{}
+ doTestReflectResponse(t, s3, &s3r)
+
+ // ------ interface test -------
+ in1 := []interface{}{rr{"dubbo", 666}, 123, "hello"}
+ var inr1 *interface{}
+ doTestReflectResponse(t, in1,
reflect.New(reflect.TypeOf(inr1).Elem()).Interface())
+
+ in2 := make(map[string]rr)
+ var inr2 map[string]rr
+ m3["dubbo"] = rr{"hello", 123}
+ m3["go"] = rr{"world", 456}
+ doTestReflectResponse(t, in2, &inr2)
+}
+
+// separately test copy normal map to map[interface{}]interface{}
+func TestCopyMap(t *testing.T) {
+ type rr struct {
+ Name string
+ Num int
+ }
+
+ m3 := make(map[string]rr)
+ var m3r map[interface{}]interface{}
+ r1 := rr{"hello", 123}
+ r2 := rr{"world", 456}
+ m3["dubbo"] = r1
+ m3["go"] = r2
+
+ err := ReflectResponse(m3, &m3r)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+
+ assert.Equal(t, 2, len(m3r))
+
+ rr1, ok := m3r["dubbo"]
+ assert.True(t, ok)
+ assert.True(t, reflect.DeepEqual(r1, rr1))
+
+ rr2, ok := m3r["go"]
+ assert.True(t, ok)
+ assert.True(t, reflect.DeepEqual(r2, rr2))
+}
+
+// separately test copy normal slice to []interface{}
+func TestCopySlice(t *testing.T) {
+ type rr struct {
+ Name string
+ Num int
+ }
+
+ r1 := rr{"hello", 123}
+ r2 := rr{"world", 456}
+
+ s1 := []rr{r1, r2}
+ var s1r []interface{}
+
+ err := ReflectResponse(s1, &s1r)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+
+ assert.Equal(t, 2, len(s1r))
+ assert.True(t, reflect.DeepEqual(r1, s1r[0]))
+ assert.True(t, reflect.DeepEqual(r2, s1r[1]))
+}
+
+func TestIsSupportResponseAttachment(t *testing.T) {
+ is := isSupportResponseAttachment("2.X")
+ assert.False(t, is)
+
+ is = isSupportResponseAttachment("2.0.10")
+ assert.False(t, is)
+
+ is = isSupportResponseAttachment("2.5.3")
+ assert.False(t, is)
+
+ is = isSupportResponseAttachment("2.6.2")
+ assert.False(t, is)
+
+ is = isSupportResponseAttachment("1.5.5")
+ assert.False(t, is)
+
+ is = isSupportResponseAttachment("0.0.0")
+ assert.False(t, is)
+
+ is = isSupportResponseAttachment("2.0.2")
+ assert.True(t, is)
+
+ is = isSupportResponseAttachment("2.7.2")
+ assert.True(t, is)
+}
+
+func TestVersion2Int(t *testing.T) {
+ v := version2Int("2.1.3")
+ assert.Equal(t, 2010300, v)
+
+ v = version2Int("22.11.33")
+ assert.Equal(t, 22113300, v)
+
+ v = version2Int("222.111.333")
+ assert.Equal(t, 223143300, v)
+
+ v = version2Int("220.110.333")
+ assert.Equal(t, 221133300, v)
+
+ v = version2Int("229.119.333")
+ assert.Equal(t, 230223300, v)
+
+ v = version2Int("2222.1111.3333")
+ assert.Equal(t, 2233443300, v)
+
+ v = version2Int("2.11")
+ assert.Equal(t, 211, v)
+
+ v = version2Int("2.1.3.4")
+ assert.Equal(t, 2010304, v)
+
+ v = version2Int("2.1.3.4.5")
+ assert.Equal(t, 201030405, v)
+
+}
diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go
index a17b282..180fd17 100644
--- a/protocol/dubbo/listener.go
+++ b/protocol/dubbo/listener.go
@@ -28,8 +28,6 @@ import (
import (
"github.com/apache/dubbo-getty"
- "github.com/apache/dubbo-go-hessian2"
- "github.com/opentracing/opentracing-go"
perrors "github.com/pkg/errors"
)
@@ -38,6 +36,7 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
+ "github.com/apache/dubbo-go/protocol/dubbo/hessian2"
"github.com/apache/dubbo-go/protocol/invocation"
)
@@ -105,8 +104,8 @@ func (h *RpcClientHandler) OnMessage(session getty.Session,
pkg interface{}) {
return
}
- if p.Header.Type&hessian.PackageHeartbeat != 0x00 {
- if p.Header.Type&hessian.PackageResponse != 0x00 {
+ if p.Header.Type&hessian2.PackageHeartbeat != 0x00 {
+ if p.Header.Type&hessian2.PackageResponse != 0x00 {
logger.Debugf("get rpc heartbeat response{header: %#v,
body: %#v}", p.Header, p.Body)
if p.Err != nil {
logger.Errorf("rpc heartbeat response{error:
%#v}", p.Err)
@@ -114,8 +113,8 @@ func (h *RpcClientHandler) OnMessage(session getty.Session,
pkg interface{}) {
h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID))
} else {
logger.Debugf("get rpc heartbeat request{header: %#v,
service: %#v, body: %#v}", p.Header, p.Service, p.Body)
- p.Header.ResponseStatus = hessian.Response_OK
- reply(session, p, hessian.PackageHeartbeat)
+ p.Header.ResponseStatus = hessian2.Response_OK
+ reply(session, p, hessian2.PackageHeartbeat)
}
return
}
@@ -229,24 +228,24 @@ func (h *RpcServerHandler) OnMessage(session
getty.Session, pkg interface{}) {
logger.Errorf("illegal package{%#v}", pkg)
return
}
- p.Header.ResponseStatus = hessian.Response_OK
+ p.Header.ResponseStatus = hessian2.Response_OK
// heartbeat
- if p.Header.Type&hessian.PackageHeartbeat != 0x00 {
+ if p.Header.Type&hessian2.PackageHeartbeat != 0x00 {
logger.Debugf("get rpc heartbeat request{header: %#v, service:
%#v, body: %#v}", p.Header, p.Service, p.Body)
- reply(session, p, hessian.PackageHeartbeat)
+ reply(session, p, hessian2.PackageHeartbeat)
return
}
twoway := true
// not twoway
- if p.Header.Type&hessian.PackageRequest_TwoWay == 0x00 {
+ if p.Header.Type&hessian2.PackageRequest_TwoWay == 0x00 {
twoway = false
}
defer func() {
if e := recover(); e != nil {
- p.Header.ResponseStatus = hessian.Response_SERVER_ERROR
+ p.Header.ResponseStatus = hessian2.Response_SERVER_ERROR
if err, ok := e.(error); ok {
logger.Errorf("OnMessage panic: %+v",
perrors.WithStack(err))
p.Body = perrors.WithStack(err)
@@ -261,7 +260,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session,
pkg interface{}) {
if !twoway {
return
}
- reply(session, p, hessian.PackageResponse)
+ reply(session, p, hessian2.PackageResponse)
}
}()
@@ -274,14 +273,14 @@ func (h *RpcServerHandler) OnMessage(session
getty.Session, pkg interface{}) {
if exporter == nil {
err := fmt.Errorf("don't have this exporter, key: %s",
u.ServiceKey())
logger.Errorf(err.Error())
- p.Header.ResponseStatus = hessian.Response_OK
+ p.Header.ResponseStatus = hessian2.Response_OK
p.Body = err
- reply(session, p, hessian.PackageResponse)
+ reply(session, p, hessian2.PackageResponse)
return
}
invoker := exporter.(protocol.Exporter).GetInvoker()
if invoker != nil {
- attachments :=
p.Body.(map[string]interface{})["attachments"].(map[string]string)
+ attachments :=
p.Body.(map[string]interface{})["attachments"].(map[string]interface{})
attachments[constant.LOCAL_ADDR] = session.LocalAddr()
attachments[constant.REMOTE_ADDR] = session.RemoteAddr()
@@ -292,19 +291,19 @@ func (h *RpcServerHandler) OnMessage(session
getty.Session, pkg interface{}) {
result := invoker.Invoke(ctx, inv)
if err := result.Error(); err != nil {
- p.Header.ResponseStatus = hessian.Response_OK
- p.Body = hessian.NewResponse(nil, err,
result.Attachments())
+ p.Header.ResponseStatus = hessian2.Response_OK
+ p.Body = hessian2.NewResponse(nil, err,
result.Attachments())
} else {
res := result.Result()
- p.Header.ResponseStatus = hessian.Response_OK
- p.Body = hessian.NewResponse(res, nil,
result.Attachments())
+ p.Header.ResponseStatus = hessian2.Response_OK
+ p.Body = hessian2.NewResponse(res, nil,
result.Attachments())
}
}
if !twoway {
return
}
- reply(session, p, hessian.PackageResponse)
+ reply(session, p, hessian2.PackageResponse)
}
// OnCron notified when RPC server session got any message in cron job
@@ -340,17 +339,16 @@ func rebuildCtx(inv *invocation.RPCInvocation)
context.Context {
ctx := context.Background()
// actually, if user do not use any opentracing framework, the err will
not be nil.
- spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
- opentracing.TextMapCarrier(inv.Attachments()))
+ spanCtx, err := extractTraceCtx(inv)
if err == nil {
ctx = context.WithValue(ctx, constant.TRACING_REMOTE_SPAN_CTX,
spanCtx)
}
return ctx
}
-func reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) {
+func reply(session getty.Session, req *DubboPackage, tp hessian2.PackageType) {
resp := &DubboPackage{
- Header: hessian.DubboHeader{
+ Header: hessian2.DubboHeader{
SerialID: req.Header.SerialID,
Type: tp,
ID: req.Header.ID,
@@ -358,7 +356,7 @@ func reply(session getty.Session, req *DubboPackage, tp
hessian.PackageType) {
},
}
- if req.Header.Type&hessian.PackageRequest != 0x00 {
+ if req.Header.Type&hessian2.PackageRequest != 0x00 {
resp.Body = req.Body
} else {
resp.Body = nil
diff --git a/protocol/dubbo/listener_test.go b/protocol/dubbo/listener_test.go
index 5f80981..5ab73fd 100644
--- a/protocol/dubbo/listener_test.go
+++ b/protocol/dubbo/listener_test.go
@@ -35,7 +35,7 @@ import (
// test rebuild the ctx
func TestRebuildCtx(t *testing.T) {
opentracing.SetGlobalTracer(mocktracer.New())
- attach := make(map[string]string, 10)
+ attach := make(map[string]interface{}, 10)
attach[constant.VERSION_KEY] = "1.0"
attach[constant.GROUP_KEY] = "MyGroup"
inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK",
"Hello"}, attach)
@@ -47,8 +47,7 @@ func TestRebuildCtx(t *testing.T) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Test-Client")
- opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap,
- opentracing.TextMapCarrier(inv.Attachments()))
+ injectTraceCtx(span, inv)
// rebuild the context success
inv = invocation.NewRPCInvocation("MethodName", []interface{}{"OK",
"Hello"}, attach)
ctx = rebuildCtx(inv)
diff --git a/protocol/dubbo/opentracing.go b/protocol/dubbo/opentracing.go
new file mode 100644
index 0000000..2dcd6a4
--- /dev/null
+++ b/protocol/dubbo/opentracing.go
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dubbo
+
+import (
+ "github.com/opentracing/opentracing-go"
+)
+import (
+ invocation_impl "github.com/apache/dubbo-go/protocol/invocation"
+)
+
+func injectTraceCtx(currentSpan opentracing.Span, inv
*invocation_impl.RPCInvocation) error {
+ // inject opentracing ctx
+ traceAttachments := filterContext(inv.Attachments())
+ carrier := opentracing.TextMapCarrier(traceAttachments)
+ err := opentracing.GlobalTracer().Inject(currentSpan.Context(),
opentracing.TextMap, carrier)
+ if err == nil {
+ fillTraceAttachments(inv.Attachments(), traceAttachments)
+ }
+ return err
+}
+
+func extractTraceCtx(inv *invocation_impl.RPCInvocation)
(opentracing.SpanContext, error) {
+ traceAttachments := filterContext(inv.Attachments())
+ // actually, if user do not use any opentracing framework, the err will
not be nil.
+ spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap,
+ opentracing.TextMapCarrier(traceAttachments))
+ return spanCtx, err
+}
+
+func filterContext(attachments map[string]interface{}) map[string]string {
+ var traceAttchment = make(map[string]string)
+ for k, v := range attachments {
+ if r, ok := v.(string); ok {
+ traceAttchment[k] = r
+ }
+ }
+ return traceAttchment
+}
+
+func fillTraceAttachments(attachments map[string]interface{}, traceAttachment
map[string]string) {
+ for k, v := range traceAttachment {
+ attachments[k] = v
+ }
+}
diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go
index adc6311..a7b37aa 100644
--- a/protocol/dubbo/readwriter.go
+++ b/protocol/dubbo/readwriter.go
@@ -24,7 +24,6 @@ import (
import (
"github.com/apache/dubbo-getty"
- "github.com/apache/dubbo-go-hessian2"
perrors "github.com/pkg/errors"
)
@@ -32,6 +31,7 @@ import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
+ "github.com/apache/dubbo-go/protocol/dubbo/hessian2"
)
////////////////////////////////////////////
@@ -56,7 +56,7 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session, data
[]byte) (interface
err := pkg.Unmarshal(buf, p.client)
if err != nil {
originErr := perrors.Cause(err)
- if originErr == hessian.ErrHeaderNotEnough || originErr ==
hessian.ErrBodyNotEnough {
+ if originErr == hessian2.ErrHeaderNotEnough || originErr ==
hessian2.ErrBodyNotEnough {
return nil, 0, nil
}
@@ -65,12 +65,12 @@ func (p *RpcClientPackageHandler) Read(ss getty.Session,
data []byte) (interface
return nil, 0, perrors.WithStack(err)
}
- if pkg.Header.Type&hessian.PackageRequest == 0x00 {
- pkg.Err = pkg.Body.(*hessian.Response).Exception
- pkg.Body = NewResponse(pkg.Body.(*hessian.Response).RspObj,
pkg.Body.(*hessian.Response).Attachments)
+ if pkg.Header.Type&hessian2.PackageRequest == 0x00 {
+ pkg.Err = pkg.Body.(*hessian2.DubboResponse).Exception
+ pkg.Body =
NewResponse(pkg.Body.(*hessian2.DubboResponse).RspObj,
pkg.Body.(*hessian2.DubboResponse).Attachments)
}
- return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil
+ return pkg, hessian2.HEADER_LENGTH + pkg.Header.BodyLen, nil
}
// Write encode @pkg.
@@ -111,7 +111,7 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session,
data []byte) (interface
err := pkg.Unmarshal(buf)
if err != nil {
originErr := perrors.Cause(err)
- if originErr == hessian.ErrHeaderNotEnough || originErr ==
hessian.ErrBodyNotEnough {
+ if originErr == hessian2.ErrHeaderNotEnough || originErr ==
hessian2.ErrBodyNotEnough {
return nil, 0, nil
}
@@ -120,13 +120,13 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session,
data []byte) (interface
return nil, 0, perrors.WithStack(err)
}
- if pkg.Header.Type&hessian.PackageHeartbeat == 0x00 {
+ if pkg.Header.Type&hessian2.PackageHeartbeat == 0x00 {
// convert params of request
req := pkg.Body.([]interface{}) // length of body should be 7
if len(req) > 0 {
var dubboVersion, argsTypes string
var args []interface{}
- var attachments map[string]string
+ var attachments map[string]interface{}
if req[0] != nil {
dubboVersion = req[0].(string)
}
@@ -146,18 +146,18 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session,
data []byte) (interface
args = req[5].([]interface{})
}
if req[6] != nil {
- attachments = req[6].(map[string]string)
+ attachments = req[6].(map[string]interface{})
}
- if pkg.Service.Path == "" &&
len(attachments[constant.PATH_KEY]) > 0 {
- pkg.Service.Path =
attachments[constant.PATH_KEY]
+ if pkg.Service.Path == "" &&
attachments[constant.PATH_KEY] != nil &&
len(attachments[constant.PATH_KEY].(string)) > 0 {
+ pkg.Service.Path =
attachments[constant.PATH_KEY].(string)
}
- if _, ok := attachments[constant.INTERFACE_KEY]; ok {
- pkg.Service.Interface =
attachments[constant.INTERFACE_KEY]
+ if inter, ok := attachments[constant.INTERFACE_KEY]; ok
&& inter != nil {
+ pkg.Service.Interface = inter.(string)
} else {
pkg.Service.Interface = pkg.Service.Path
}
- if len(attachments[constant.GROUP_KEY]) > 0 {
- pkg.Service.Group =
attachments[constant.GROUP_KEY]
+ if attachments[constant.GROUP_KEY] != nil &&
len(attachments[constant.GROUP_KEY].(string)) > 0 {
+ pkg.Service.Group =
attachments[constant.GROUP_KEY].(string)
}
pkg.Body = map[string]interface{}{
"dubboVersion": dubboVersion,
@@ -169,7 +169,7 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session,
data []byte) (interface
}
}
- return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil
+ return pkg, hessian2.HEADER_LENGTH + pkg.Header.BodyLen, nil
}
// Write encode @pkg.
diff --git a/protocol/invocation.go b/protocol/invocation.go
index 296ec05..452f619 100644
--- a/protocol/invocation.go
+++ b/protocol/invocation.go
@@ -34,15 +34,16 @@ type Invocation interface {
// Reply gets response of request
Reply() interface{}
// Attachments gets all attachments
- Attachments() map[string]string
- // AttachmentsByKey gets attachment by key , if nil then return default
value
+ Attachments() map[string]interface{}
+ // AttachmentsByKey gets attachment by key , if nil then return default
value. (It will be deprecated in the future)
AttachmentsByKey(string, string) string
+ Attachment(string) interface{}
// Attributes refers to dubbo 2.7.6. It is different from attachment.
It is used in internal process.
Attributes() map[string]interface{}
// AttributeByKey gets attribute by key , if nil then return default
value
AttributeByKey(string, interface{}) interface{}
// SetAttachments sets attribute by @key and @value.
- SetAttachments(key string, value string)
+ SetAttachments(key string, value interface{})
// Invoker gets the invoker in current context.
Invoker() Invoker
}
diff --git a/protocol/invocation/rpcinvocation.go
b/protocol/invocation/rpcinvocation.go
index c72e105..35d1296 100644
--- a/protocol/invocation/rpcinvocation.go
+++ b/protocol/invocation/rpcinvocation.go
@@ -39,7 +39,7 @@ type RPCInvocation struct {
arguments []interface{}
reply interface{}
callBack interface{}
- attachments map[string]string
+ attachments map[string]interface{}
// Refer to dubbo 2.7.6. It is different from attachment. It is used
in internal process.
attributes map[string]interface{}
invoker protocol.Invoker
@@ -47,7 +47,7 @@ type RPCInvocation struct {
}
// NewRPCInvocation creates a RPC invocation.
-func NewRPCInvocation(methodName string, arguments []interface{}, attachments
map[string]string) *RPCInvocation {
+func NewRPCInvocation(methodName string, arguments []interface{}, attachments
map[string]interface{}) *RPCInvocation {
return &RPCInvocation{
methodName: methodName,
arguments: arguments,
@@ -99,7 +99,7 @@ func (r *RPCInvocation) SetReply(reply interface{}) {
}
// Attachments gets all attachments of RPC.
-func (r *RPCInvocation) Attachments() map[string]string {
+func (r *RPCInvocation) Attachments() map[string]interface{} {
return r.attachments
}
@@ -112,11 +112,25 @@ func (r *RPCInvocation) AttachmentsByKey(key string,
defaultValue string) string
}
value, ok := r.attachments[key]
if ok {
- return value
+ return value.(string)
}
return defaultValue
}
+// Attachment returns the corresponding value from dubbo's attachment with the
given key.
+func (r *RPCInvocation) Attachment(key string) interface{} {
+ r.lock.RLock()
+ defer r.lock.RUnlock()
+ if r.attachments == nil {
+ return nil
+ }
+ value, ok := r.attachments[key]
+ if ok {
+ return value
+ }
+ return nil
+}
+
// Attributes gets all attributes of RPC.
func (r *RPCInvocation) Attributes() map[string]interface{} {
return r.attributes
@@ -134,11 +148,11 @@ func (r *RPCInvocation) AttributeByKey(key string,
defaultValue interface{}) int
}
// SetAttachments sets attribute by @key and @value.
-func (r *RPCInvocation) SetAttachments(key string, value string) {
+func (r *RPCInvocation) SetAttachments(key string, value interface{}) {
r.lock.Lock()
defer r.lock.Unlock()
if r.attachments == nil {
- r.attachments = make(map[string]string)
+ r.attachments = make(map[string]interface{})
}
r.attachments[key] = value
}
@@ -221,7 +235,7 @@ func WithCallBack(callBack interface{}) option {
}
// WithAttachments creates option with @attachments.
-func WithAttachments(attachments map[string]string) option {
+func WithAttachments(attachments map[string]interface{}) option {
return func(invo *RPCInvocation) {
invo.attachments = attachments
}
diff --git a/protocol/jsonrpc/server.go b/protocol/jsonrpc/server.go
index 29eba02..9755a48 100644
--- a/protocol/jsonrpc/server.go
+++ b/protocol/jsonrpc/server.go
@@ -343,7 +343,7 @@ func serveRequest(ctx context.Context, header
map[string]string, body []byte, co
exporter, _ := jsonrpcProtocol.ExporterMap().Load(path)
invoker := exporter.(*JsonrpcExporter).GetInvoker()
if invoker != nil {
- result := invoker.Invoke(ctx,
invocation.NewRPCInvocation(methodName, args, map[string]string{
+ result := invoker.Invoke(ctx,
invocation.NewRPCInvocation(methodName, args, map[string]interface{}{
constant.PATH_KEY: path,
constant.VERSION_KEY: codec.req.Version}))
if err := result.Error(); err != nil {
diff --git a/protocol/rest/server/rest_server.go
b/protocol/rest/server/rest_server.go
index fbd6fb7..d9542bb 100644
--- a/protocol/rest/server/rest_server.go
+++ b/protocol/rest/server/rest_server.go
@@ -111,7 +111,7 @@ func GetRouteFunc(invoker protocol.Invoker, methodConfig
*rest_config.RestMethod
logger.Errorf("[Go Restful] WriteErrorString
error:%v", err)
}
}
- result := invoker.Invoke(context.Background(),
invocation.NewRPCInvocation(methodConfig.MethodName, args,
make(map[string]string)))
+ result := invoker.Invoke(context.Background(),
invocation.NewRPCInvocation(methodConfig.MethodName, args,
make(map[string]interface{})))
if result.Error() != nil {
err = resp.WriteError(http.StatusInternalServerError,
result.Error())
if err != nil {
diff --git a/protocol/result.go b/protocol/result.go
index 2a33be6..a36b16d 100644
--- a/protocol/result.go
+++ b/protocol/result.go
@@ -28,13 +28,14 @@ type Result interface {
// Result gets invoker result.
Result() interface{}
// SetAttachments replaces the existing attachments with the specified
param.
- SetAttachments(map[string]string)
+ SetAttachments(map[string]interface{})
// Attachments gets all attachments
- Attachments() map[string]string
+ Attachments() map[string]interface{}
+
// AddAttachment adds the specified map to existing attachments in this
instance.
- AddAttachment(string, string)
+ AddAttachment(string, interface{})
// Attachment gets attachment by key with default value.
- Attachment(string, string) string
+ Attachment(string, interface{}) interface{}
}
/////////////////////////////
@@ -43,7 +44,7 @@ type Result interface {
// RPCResult is default RPC result.
type RPCResult struct {
- Attrs map[string]string
+ Attrs map[string]interface{}
Err error
Rest interface{}
}
@@ -69,22 +70,22 @@ func (r *RPCResult) Result() interface{} {
}
// SetAttachments replaces the existing attachments with the specified param.
-func (r *RPCResult) SetAttachments(attr map[string]string) {
+func (r *RPCResult) SetAttachments(attr map[string]interface{}) {
r.Attrs = attr
}
// Attachments gets all attachments
-func (r *RPCResult) Attachments() map[string]string {
+func (r *RPCResult) Attachments() map[string]interface{} {
return r.Attrs
}
// AddAttachment adds the specified map to existing attachments in this
instance.
-func (r *RPCResult) AddAttachment(key, value string) {
+func (r *RPCResult) AddAttachment(key string, value interface{}) {
r.Attrs[key] = value
}
// Attachment gets attachment by key with default value.
-func (r *RPCResult) Attachment(key, defaultValue string) string {
+func (r *RPCResult) Attachment(key string, defaultValue interface{})
interface{} {
v, ok := r.Attrs[key]
if !ok {
v = defaultValue
diff --git a/test/integrate/dubbo/go-client/go.mod
b/test/integrate/dubbo/go-client/go.mod
index 4708eb1..b0be45a 100644
--- a/test/integrate/dubbo/go-client/go.mod
+++ b/test/integrate/dubbo/go-client/go.mod
@@ -1,3 +1,7 @@
module github.com/apache/dubbo-go/test/integrate/dubbo/go-client
+require (
+ github.com/apache/dubbo-go-hessian2
v1.6.0-rc1.0.20200906044240-6c1fb5c3bd44
+)
+
go 1.13
diff --git a/test/integrate/dubbo/go-server/go.mod
b/test/integrate/dubbo/go-server/go.mod
index 9e11623..6c530f6 100644
--- a/test/integrate/dubbo/go-server/go.mod
+++ b/test/integrate/dubbo/go-server/go.mod
@@ -1,3 +1,7 @@
module github.com/apache/dubbo-go/test/integrate/dubbo/go-server
+require (
+ github.com/apache/dubbo-go-hessian2
v1.6.0-rc1.0.20200906044240-6c1fb5c3bd44
+)
+
go 1.13