This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 1.5
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/1.5 by this push:
new 81c2e23 feat(getty): upgrade getty version to 1.4.5 for dubbo-go 1.5
(#1378)
81c2e23 is described below
commit 81c2e233f3c9327d483fb80e979a07d17895b6ae
Author: Mulavar <[email protected]>
AuthorDate: Fri Aug 20 10:07:38 2021 +0800
feat(getty): upgrade getty version to 1.4.5 for dubbo-go 1.5 (#1378)
* feat(getty): upgrade getty version to 1.4.5
* rft(getty): refactor getty unit test
* rft(getty): delete outdated getty params
Co-authored-by: dongjianhui03 <[email protected]>
---
config/testdata/consumer_config.properties | 1 -
config/testdata/consumer_config.yml | 1 -
.../testdata/consumer_config_with_configcenter.yml | 1 -
.../testdata/consumer_config_withoutProtocol.yml | 1 -
config/testdata/provider_config.properties | 1 -
config/testdata/provider_config.yml | 1 -
.../testdata/provider_config_withoutProtocol.yml | 1 -
config_center/apollo/impl_test.go | 1 -
config_center/nacos/facade.go | 5 +-
go.mod | 2 +-
go.sum | 9 +-
.../service/exporter/configurable/exporter_test.go | 1 -
protocol/dubbo/dubbo_invoker_test.go | 2 -
protocol/dubbo/dubbo_protocol_test.go | 2 -
registry/consul/registry.go | 6 +-
registry/etcdv3/listener_test.go | 4 +-
registry/kubernetes/listener.go | 1 +
registry/kubernetes/registry.go | 8 +-
registry/kubernetes/registry_test.go | 1 +
remoting/etcdv3/client.go | 2 +
remoting/etcdv3/client_test.go | 3 +
remoting/etcdv3/facade.go | 5 +-
remoting/etcdv3/listener.go | 1 +
remoting/getty/config.go | 7 +-
remoting/getty/dubbo_codec_for_test.go | 5 +-
remoting/getty/getty_client.go | 7 +-
remoting/getty/getty_client_test.go | 154 ++++++---------------
remoting/getty/getty_server.go | 3 +
remoting/getty/listener.go | 6 +-
remoting/getty/listener_test.go | 1 +
remoting/getty/opentracing.go | 1 +
remoting/getty/readwriter.go | 6 +-
remoting/getty/readwriter_test.go | 151 +++++++++++++-------
remoting/zookeeper/client.go | 1 +
.../curator_discovery/service_discovery.go | 2 +
remoting/zookeeper/facade.go | 2 +
remoting/zookeeper/listener.go | 7 +-
test/integrate/dubbo/go-client/client.yml | 1 -
test/integrate/dubbo/go-server/server.yml | 1 -
tools/cli/example/server/config/server.yml | 1 -
40 files changed, 201 insertions(+), 215 deletions(-)
diff --git a/config/testdata/consumer_config.properties
b/config/testdata/consumer_config.properties
index da9fe4f..aabb4fc 100644
--- a/config/testdata/consumer_config.properties
+++ b/config/testdata/consumer_config.properties
@@ -44,7 +44,6 @@ protocol_conf.dubbo.getty_session_param.tcp_keep_alive=true
protocol_conf.dubbo.getty_session_param.keep_alive_period=120s
protocol_conf.dubbo.getty_session_param.tcp_r_buf_size=262144
protocol_conf.dubbo.getty_session_param.tcp_w_buf_size=65536
-protocol_conf.dubbo.getty_session_param.pkg_wq_size=512
protocol_conf.dubbo.getty_session_param.tcp_read_timeout=1s
protocol_conf.dubbo.getty_session_param.tcp_write_timeout=5s
protocol_conf.dubbo.getty_session_param.wait_timeout=1s
diff --git a/config/testdata/consumer_config.yml
b/config/testdata/consumer_config.yml
index 2034186..c011c9e 100644
--- a/config/testdata/consumer_config.yml
+++ b/config/testdata/consumer_config.yml
@@ -87,7 +87,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
- pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
wait_timeout: "1s"
diff --git a/config/testdata/consumer_config_with_configcenter.yml
b/config/testdata/consumer_config_with_configcenter.yml
index ebe56fa..35b0e69 100644
--- a/config/testdata/consumer_config_with_configcenter.yml
+++ b/config/testdata/consumer_config_with_configcenter.yml
@@ -36,7 +36,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
- pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
wait_timeout: "1s"
diff --git a/config/testdata/consumer_config_withoutProtocol.yml
b/config/testdata/consumer_config_withoutProtocol.yml
index 32bad8b..09cd9a1 100644
--- a/config/testdata/consumer_config_withoutProtocol.yml
+++ b/config/testdata/consumer_config_withoutProtocol.yml
@@ -73,7 +73,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
- pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
wait_timeout: "1s"
diff --git a/config/testdata/provider_config.properties
b/config/testdata/provider_config.properties
index f7d70f5..0e47bc6 100644
--- a/config/testdata/provider_config.properties
+++ b/config/testdata/provider_config.properties
@@ -50,7 +50,6 @@ protocol_conf.dubbo.getty_session_param.tcp_keep_alive=true
protocol_conf.dubbo.getty_session_param.keep_alive_period=120s
protocol_conf.dubbo.getty_session_param.tcp_r_buf_size=262144
protocol_conf.dubbo.getty_session_param.tcp_w_buf_size=65536
-protocol_conf.dubbo.getty_session_param.pkg_wq_size=512
protocol_conf.dubbo.getty_session_param.tcp_read_timeout=1s
protocol_conf.dubbo.getty_session_param.tcp_write_timeout=5s
protocol_conf.dubbo.getty_session_param.wait_timeout=1s
diff --git a/config/testdata/provider_config.yml
b/config/testdata/provider_config.yml
index 7c46f91..18907f0 100644
--- a/config/testdata/provider_config.yml
+++ b/config/testdata/provider_config.yml
@@ -92,7 +92,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
- pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
wait_timeout: "1s"
diff --git a/config/testdata/provider_config_withoutProtocol.yml
b/config/testdata/provider_config_withoutProtocol.yml
index 532d300..c76dca2 100644
--- a/config/testdata/provider_config_withoutProtocol.yml
+++ b/config/testdata/provider_config_withoutProtocol.yml
@@ -72,7 +72,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
- pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
wait_timeout: "1s"
diff --git a/config_center/apollo/impl_test.go
b/config_center/apollo/impl_test.go
index 43e306e..c41b558 100644
--- a/config_center/apollo/impl_test.go
+++ b/config_center/apollo/impl_test.go
@@ -111,7 +111,6 @@ var (
"application.environment": "dev",
"services.UserProvider.protocol": "dubbo",
"application.organization": "ikurento.com",
- "protocol_conf.dubbo.getty_session_param.pkg_wq_size": "512",
"services.UserProvider.methods[0].loadbalance": "random"
},
"releaseKey": "20191104105242-0f13805d89f834a4"
diff --git a/config_center/nacos/facade.go b/config_center/nacos/facade.go
index d089ed2..9f847aa 100644
--- a/config_center/nacos/facade.go
+++ b/config_center/nacos/facade.go
@@ -22,7 +22,8 @@ import (
"time"
)
import (
- "github.com/apache/dubbo-getty"
+ gxtime "github.com/dubbogo/gost/time"
+
perrors "github.com/pkg/errors"
)
@@ -74,7 +75,7 @@ LOOP:
case <-r.GetDone():
logger.Warnf("(NacosProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
- case
<-getty.GetTimeWheel().After(time.Duration(failTimes*connDelay) * time.Second):
// Prevent crazy reconnection nacos.
+ case
<-gxtime.After(time.Duration(failTimes*connDelay) * time.Second): // Prevent
crazy reconnection nacos.
}
err = ValidateNacosClient(r,
WithNacosName(nacosName))
logger.Infof("NacosProviderRegistry.validateNacosClient(nacosAddr{%s}) =
error{%#v}",
diff --git a/go.mod b/go.mod
index 3ca198b..4af07b3 100644
--- a/go.mod
+++ b/go.mod
@@ -7,7 +7,7 @@ require (
github.com/Workiva/go-datastructures v1.0.52
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/alibaba/sentinel-golang v1.0.2
- github.com/apache/dubbo-getty v1.4.3
+ github.com/apache/dubbo-getty v1.4.5
github.com/apache/dubbo-go-hessian2 v1.9.1
github.com/coreos/etcd v3.3.25+incompatible
github.com/creasty/defaults v1.5.1
diff --git a/go.sum b/go.sum
index a2aee3e..0a807e1 100644
--- a/go.sum
+++ b/go.sum
@@ -80,8 +80,8 @@ github.com/alibaba/sentinel-golang v1.0.2/go.mod
h1:QsB99f/z35D2AiMrAWwgWE85kDTk
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18
h1:zOVTBdCKFd9JbCKz9/nt+FovbjPFmb7mUnp8nH9fQBA=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod
h1:v8ESoHo4SyHmuB4b1tJqDHxfTGEciD+yhvOU/5s1Rfk=
github.com/antihax/optional v1.0.0/go.mod
h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
-github.com/apache/dubbo-getty v1.4.3
h1:PCKpryDasKOxwT5MBC6MIMO+0NLOaHF6Xco9YXQw7HI=
-github.com/apache/dubbo-getty v1.4.3/go.mod
h1:ansXgKxxyhCOiQL29nO5ce1MDcEKmCyZuNR9oMs3hek=
+github.com/apache/dubbo-getty v1.4.5
h1:MptKbjC0n2Mo/8eFPwirSInH2BfdNG4IZch43PdNvIM=
+github.com/apache/dubbo-getty v1.4.5/go.mod
h1:mcDyiu7M/TVrYDyL8TxDemQkOdvEqqHSQ4jOuYejY1w=
github.com/apache/dubbo-go-hessian2 v1.9.1
h1:ceSsU/9z/gv3hzUpl8GceEhQvF3i0BionfdHUGMmjHU=
github.com/apache/dubbo-go-hessian2 v1.9.1/go.mod
h1:xQUjE7F8PX49nm80kChFvepA/AvqAZ0oh/UaB6+6pBE=
github.com/apache/thrift v0.12.0/go.mod
h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
@@ -180,7 +180,7 @@ github.com/docker/spdystream
v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZ
github.com/dubbogo/go-zookeeper v1.0.3
h1:UkuY+rBsxdT7Bs63QAzp9z7XqQ53W1j8E5rwl83me8g=
github.com/dubbogo/go-zookeeper v1.0.3/go.mod
h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/gost v1.9.0/go.mod
h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
-github.com/dubbogo/gost v1.10.1/go.mod
h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI=
+github.com/dubbogo/gost v1.11.12/go.mod
h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/gost v1.11.14
h1:9lfcdILOmqTOVAW1fPHa5uf1NrD6jlIOBe4vf8576yQ=
github.com/dubbogo/gost v1.11.14/go.mod
h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/jsonparser v1.0.1/go.mod
h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
@@ -572,6 +572,7 @@ github.com/modern-go/reflect2
v0.0.0-20180320133207-05fbef0ca5da/go.mod h1:bx2lN
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod
h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1
h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod
h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/montanaflynn/stats v0.6.6/go.mod
h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae
h1:VeRdUYdCw49yizlSbMEn2SZ+gT+3IUKx8BqxyQdz+BY=
github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod
h1:qAyveg+e4CE+eKJXWVjKXM4ck2QobLqTDytGJbLLhJg=
github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod
h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
@@ -707,7 +708,6 @@ github.com/sean-/pager
v0.0.0-20180208200047-666be9bf53b5/go.mod h1:BeybITEsBEg6
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529
h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod
h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shirou/gopsutil v0.0.0-20181107111621-48177ef5f880/go.mod
h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
-github.com/shirou/gopsutil
v3.20.11-0.20201116082039-2fb5da2f2449+incompatible/go.mod
h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v3.20.11+incompatible
h1:LJr4ZQK4mPpIV5gOa4jCOKOGb4ty4DZO54I4FGqIpto=
github.com/shirou/gopsutil v3.20.11+incompatible/go.mod
h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod
h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc=
@@ -763,7 +763,6 @@ github.com/stretchr/testify v1.2.2/go.mod
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod
h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod
h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
-github.com/stretchr/testify v1.6.1/go.mod
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0
h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0
h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
diff --git a/metadata/service/exporter/configurable/exporter_test.go
b/metadata/service/exporter/configurable/exporter_test.go
index 7c2baa2..51acaa7 100644
--- a/metadata/service/exporter/configurable/exporter_test.go
+++ b/metadata/service/exporter/configurable/exporter_test.go
@@ -46,7 +46,6 @@ func TestConfigurableExporter(t *testing.T) {
KeepAlivePeriod: "120s",
TcpRBufSize: 262144,
TcpWBufSize: 65536,
- PkgWQSize: 512,
TcpReadTimeout: "1s",
TcpWriteTimeout: "5s",
WaitTimeout: "1s",
diff --git a/protocol/dubbo/dubbo_invoker_test.go
b/protocol/dubbo/dubbo_invoker_test.go
index 3f9c849..782c615 100644
--- a/protocol/dubbo/dubbo_invoker_test.go
+++ b/protocol/dubbo/dubbo_invoker_test.go
@@ -113,7 +113,6 @@ func InitTest(t *testing.T) (protocol.Protocol,
*common.URL) {
KeepAlivePeriod: "120s",
TcpRBufSize: 262144,
TcpWBufSize: 65536,
- PkgWQSize: 512,
TcpReadTimeout: "4s",
TcpWriteTimeout: "5s",
WaitTimeout: "1s",
@@ -131,7 +130,6 @@ func InitTest(t *testing.T) (protocol.Protocol,
*common.URL) {
KeepAlivePeriod: "120s",
TcpRBufSize: 262144,
TcpWBufSize: 65536,
- PkgWQSize: 512,
TcpReadTimeout: "1s",
TcpWriteTimeout: "5s",
WaitTimeout: "1s",
diff --git a/protocol/dubbo/dubbo_protocol_test.go
b/protocol/dubbo/dubbo_protocol_test.go
index 97669a2..9c547a2 100644
--- a/protocol/dubbo/dubbo_protocol_test.go
+++ b/protocol/dubbo/dubbo_protocol_test.go
@@ -52,7 +52,6 @@ func initDubboInvokerTest() {
KeepAlivePeriod: "120s",
TcpRBufSize: 262144,
TcpWBufSize: 65536,
- PkgWQSize: 512,
TcpReadTimeout: "1s",
TcpWriteTimeout: "5s",
WaitTimeout: "1s",
@@ -70,7 +69,6 @@ func initDubboInvokerTest() {
KeepAlivePeriod: "120s",
TcpRBufSize: 262144,
TcpWBufSize: 65536,
- PkgWQSize: 512,
TcpReadTimeout: "4s",
TcpWriteTimeout: "5s",
WaitTimeout: "1s",
diff --git a/registry/consul/registry.go b/registry/consul/registry.go
index a50536f..a2935b1 100644
--- a/registry/consul/registry.go
+++ b/registry/consul/registry.go
@@ -23,8 +23,10 @@ import (
)
import (
- getty "github.com/apache/dubbo-getty"
+ gxtime "github.com/dubbogo/gost/time"
+
consul "github.com/hashicorp/consul/api"
+
perrors "github.com/pkg/errors"
)
@@ -211,7 +213,7 @@ func (r *consulRegistry) Destroy() {
select {
case <-done:
logger.Infof("consulRegistry unregister done")
- case <-getty.GetTimeWheel().After(registryDestroyDefaultTimeout):
+ case <-gxtime.After(registryDestroyDefaultTimeout):
logger.Errorf("consul unregister timeout")
}
diff --git a/registry/etcdv3/listener_test.go b/registry/etcdv3/listener_test.go
index ff7f63f..12c3c50 100644
--- a/registry/etcdv3/listener_test.go
+++ b/registry/etcdv3/listener_test.go
@@ -24,8 +24,8 @@ import (
)
import (
- "github.com/apache/dubbo-getty"
"github.com/coreos/etcd/embed"
+ gxtime "github.com/dubbogo/gost/time"
"github.com/stretchr/testify/suite"
)
@@ -57,7 +57,7 @@ func (suite *RegistryTestSuite) SetupSuite() {
select {
case <-e.Server.ReadyNotify():
t.Log("Server is ready!")
- case <-getty.GetTimeWheel().After(60 * time.Second):
+ case <-gxtime.After(60 * time.Second):
e.Server.Stop() // trigger a shutdown
t.Logf("Server took too long to start!")
}
diff --git a/registry/kubernetes/listener.go b/registry/kubernetes/listener.go
index ba3326b..de23415 100644
--- a/registry/kubernetes/listener.go
+++ b/registry/kubernetes/listener.go
@@ -23,6 +23,7 @@ import (
import (
gxchan "github.com/dubbogo/gost/container/chan"
+
perrors "github.com/pkg/errors"
)
diff --git a/registry/kubernetes/registry.go b/registry/kubernetes/registry.go
index a26478f..3767075 100644
--- a/registry/kubernetes/registry.go
+++ b/registry/kubernetes/registry.go
@@ -25,8 +25,10 @@ import (
)
import (
- getty "github.com/apache/dubbo-getty"
+ gxtime "github.com/dubbogo/gost/time"
+
perrors "github.com/pkg/errors"
+
v1 "k8s.io/api/core/v1"
)
@@ -46,8 +48,6 @@ const (
)
func init() {
- //processID = fmt.Sprintf("%d", os.Getpid())
- //localIP = common.GetLocalIp()
extension.SetRegistry(Name, newKubernetesRegistry)
}
@@ -221,7 +221,7 @@ LOOP:
case <-r.Done():
logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes Registry
goroutine exit now...")
break LOOP
- case
<-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): //
avoid connect frequent
+ case
<-gxtime.After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect
frequent
}
err = kubernetes.ValidateClient(r)
logger.Infof("Kubernetes
ProviderRegistry.validateKubernetesClient = error{%#v}", perrors.WithStack(err))
diff --git a/registry/kubernetes/registry_test.go
b/registry/kubernetes/registry_test.go
index a816b03..d63d338 100644
--- a/registry/kubernetes/registry_test.go
+++ b/registry/kubernetes/registry_test.go
@@ -28,6 +28,7 @@ import (
import (
"github.com/stretchr/testify/assert"
+
v1 "k8s.io/api/core/v1"
)
diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go
index 3027d02..d692e19 100644
--- a/remoting/etcdv3/client.go
+++ b/remoting/etcdv3/client.go
@@ -26,7 +26,9 @@ import (
import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
+
perrors "github.com/pkg/errors"
+
"google.golang.org/grpc"
)
diff --git a/remoting/etcdv3/client_test.go b/remoting/etcdv3/client_test.go
index 787c24d..0746307 100644
--- a/remoting/etcdv3/client_test.go
+++ b/remoting/etcdv3/client_test.go
@@ -31,9 +31,12 @@ import (
import (
"github.com/coreos/etcd/embed"
"github.com/coreos/etcd/mvcc/mvccpb"
+
perrors "github.com/pkg/errors"
+
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
+
"google.golang.org/grpc/connectivity"
)
diff --git a/remoting/etcdv3/facade.go b/remoting/etcdv3/facade.go
index ecda00b..a4c814d 100644
--- a/remoting/etcdv3/facade.go
+++ b/remoting/etcdv3/facade.go
@@ -23,7 +23,8 @@ import (
)
import (
- "github.com/apache/dubbo-getty"
+ gxtime "github.com/dubbogo/gost/time"
+
perrors "github.com/pkg/errors"
)
@@ -75,7 +76,7 @@ LOOP:
case <-r.Done():
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit
now...")
break LOOP
- case
<-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): //
avoid connect frequent
+ case
<-gxtime.After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect
frequent
}
err = ValidateClient(
r,
diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go
index fd6f958..c1ad602 100644
--- a/remoting/etcdv3/listener.go
+++ b/remoting/etcdv3/listener.go
@@ -25,6 +25,7 @@ import (
import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
+
perrors "github.com/pkg/errors"
)
diff --git a/remoting/getty/config.go b/remoting/getty/config.go
index 9cdadae..20bfa2a 100644
--- a/remoting/getty/config.go
+++ b/remoting/getty/config.go
@@ -39,7 +39,6 @@ type (
keepAlivePeriod time.Duration
TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size"
json:"tcp_r_buf_size,omitempty"`
TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size"
json:"tcp_w_buf_size,omitempty"`
- PkgWQSize int `default:"1024" yaml:"pkg_wq_size"
json:"pkg_wq_size,omitempty"`
TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout"
json:"tcp_read_timeout,omitempty"`
tcpReadTimeout time.Duration
TcpWriteTimeout string `default:"5s" yaml:"tcp_write_timeout"
json:"tcp_write_timeout,omitempty"`
@@ -67,7 +66,7 @@ type (
sessionTimeout time.Duration
SessionNumber int `default:"1000" yaml:"session_number"
json:"session_number,omitempty"`
- // grpool
+ // gr pool
GrPoolSize int `default:"0" yaml:"gr_pool_size"
json:"gr_pool_size,omitempty"`
QueueLen int `default:"0" yaml:"queue_len"
json:"queue_len,omitempty"`
QueueNumber int `default:"0" yaml:"queue_number"
json:"queue_number,omitempty"`
@@ -95,7 +94,7 @@ type (
SessionTimeout string `default:"60s" yaml:"session_timeout"
json:"session_timeout,omitempty"`
sessionTimeout time.Duration
- // grpool
+ // gr pool
GrPoolSize int `default:"0" yaml:"gr_pool_size"
json:"gr_pool_size,omitempty"`
QueueLen int `default:"0" yaml:"queue_len"
json:"queue_len,omitempty"`
QueueNumber int `default:"0" yaml:"queue_number"
json:"queue_number,omitempty"`
@@ -122,7 +121,6 @@ func GetDefaultClientConfig() ClientConfig {
KeepAlivePeriod: "180s",
TcpRBufSize: 262144,
TcpWBufSize: 65536,
- PkgWQSize: 512,
TcpReadTimeout: "1s",
TcpWriteTimeout: "5s",
WaitTimeout: "1s",
@@ -146,7 +144,6 @@ func GetDefaultServerConfig() ServerConfig {
KeepAlivePeriod: "180s",
TcpRBufSize: 262144,
TcpWBufSize: 65536,
- PkgWQSize: 512,
TcpReadTimeout: "1s",
TcpWriteTimeout: "5s",
WaitTimeout: "1s",
diff --git a/remoting/getty/dubbo_codec_for_test.go
b/remoting/getty/dubbo_codec_for_test.go
index 9afc18a..ec41440 100644
--- a/remoting/getty/dubbo_codec_for_test.go
+++ b/remoting/getty/dubbo_codec_for_test.go
@@ -26,6 +26,7 @@ import (
import (
hessian "github.com/apache/dubbo-go-hessian2"
+
perrors "github.com/pkg/errors"
)
@@ -48,7 +49,7 @@ type DubboTestCodec struct {
// encode request for transport
func (c *DubboTestCodec) EncodeRequest(request *remoting.Request)
(*bytes.Buffer, error) {
if request.Event {
- return c.encodeHeartbeartReqeust(request)
+ return c.encodeHeartbeatRequest(request)
}
invoc, ok := request.Data.(*invocation.RPCInvocation)
@@ -100,7 +101,7 @@ func (c *DubboTestCodec) EncodeRequest(request
*remoting.Request) (*bytes.Buffer
}
// encode heartbeat request
-func (c *DubboTestCodec) encodeHeartbeartReqeust(request *remoting.Request)
(*bytes.Buffer, error) {
+func (c *DubboTestCodec) encodeHeartbeatRequest(request *remoting.Request)
(*bytes.Buffer, error) {
header := impl.DubboHeader{
Type: impl.PackageHeartbeat,
SerialID: constant.S_Hessian2,
diff --git a/remoting/getty/getty_client.go b/remoting/getty/getty_client.go
index c7bc802..7082773 100644
--- a/remoting/getty/getty_client.go
+++ b/remoting/getty/getty_client.go
@@ -29,9 +29,14 @@ import (
import (
"github.com/apache/dubbo-getty"
+
gxsync "github.com/dubbogo/gost/sync"
+ gxtime "github.com/dubbogo/gost/time"
+
perrors "github.com/pkg/errors"
+
uatomic "go.uber.org/atomic"
+
"gopkg.in/yaml.v2"
)
@@ -205,7 +210,7 @@ func (c *Client) Request(request *remoting.Request, timeout
time.Duration, respo
}
select {
- case <-getty.GetTimeWheel().After(timeout):
+ case <-gxtime.After(timeout):
return perrors.WithStack(errClientReadTimeout)
case <-response.Done:
err = response.Err
diff --git a/remoting/getty/getty_client_test.go
b/remoting/getty/getty_client_test.go
index a03fd52..8115df6 100644
--- a/remoting/getty/getty_client_test.go
+++ b/remoting/getty/getty_client_test.go
@@ -28,30 +28,59 @@ import (
import (
hessian "github.com/apache/dubbo-go-hessian2"
+
perrors "github.com/pkg/errors"
+
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
. "github.com/apache/dubbo-go/common/constant"
- "github.com/apache/dubbo-go/common/proxy/proxy_factory"
- "github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/remoting"
)
func TestRunSuite(t *testing.T) {
- svr, url := InitTest(t)
- client := getClient(url)
+ initTestEnvironment(t)
+ userUrl := initUserUrl(t)
+ server := getServer(userUrl)
+ client := getClient(userUrl)
assert.NotNil(t, client)
+
testRequestOneWay(t, client)
- testClient_Call(t, client)
- testClient_AsyncCall(t, client)
- svr.Stop()
+ testClientCall(t, client)
+ testClientAsyncCall(t, client)
+ server.Stop()
}
+//////////////////////////////////
+// init special url
+//////////////////////////////////
+
+func initUserUrl(t *testing.T) *common.URL {
+ hessian.RegisterPOJO(&User{})
+ remoting.RegistryCodec("dubbo", &DubboTestCodec{})
+
+ methods, err :=
common.ServiceMap.Register("com.ikurento.user.UserProvider", "dubbo", "", "",
&UserProvider{})
+ assert.NoError(t, err)
+ assert.Equal(t,
"GetBigPkg,GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4,GetUser5,GetUser6",
methods)
+
+ url, err :=
common.NewURL("dubbo://127.0.0.1:20060/com.ikurento.user.UserProvider?anyhost=true&"
+
+
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"
+
+
"environment=dev&interface=com.ikurento.user.UserProvider&ip=127.0.0.1&methods=GetUser%2C&"
+
+
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"
+
+
"side=provider&timeout=3000×tamp=1556509797245&bean.name=UserProvider")
+ assert.NoError(t, err)
+
+ return url
+}
+
+//////////////////////////////////
+// test cases
+//////////////////////////////////
+
func testRequestOneWay(t *testing.T, client *Client) {
request := remoting.NewRequest("2.0.2")
@@ -66,32 +95,7 @@ func testRequestOneWay(t *testing.T, client *Client) {
assert.NoError(t, err)
}
-func createInvocation(methodName string, callback interface{}, reply
interface{}, arguments []interface{},
- parameterValues []reflect.Value) *invocation.RPCInvocation {
- return
invocation.NewRPCInvocationWithOptions(invocation.WithMethodName(methodName),
- invocation.WithArguments(arguments),
invocation.WithReply(reply),
- invocation.WithCallBack(callback),
invocation.WithParameterValues(parameterValues))
-}
-
-func setAttachment(invocation *invocation.RPCInvocation, attachments
map[string]string) {
- for key, value := range attachments {
- invocation.SetAttachments(key, value)
- }
-}
-
-func getClient(url *common.URL) *Client {
- client := NewClient(Options{
- ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
- })
-
- if err := client.Connect(url); err != nil {
- return nil
- }
- return client
-}
-
-func testClient_Call(t *testing.T, c *Client) {
-
+func testClientCall(t *testing.T, c *Client) {
testGetBigPkg(t, c)
testGetUser(t, c)
testGetUser0(t, c)
@@ -308,7 +312,7 @@ func testGetUser61(t *testing.T, c *Client) {
assert.Equal(t, User{Id: "1", Name: ""}, *user)
}
-func testClient_AsyncCall(t *testing.T, client *Client) {
+func testClientAsyncCall(t *testing.T, client *Client) {
user := &User{}
wg := sync.WaitGroup{}
request := remoting.NewRequest("2.0.2")
@@ -336,86 +340,6 @@ func testClient_AsyncCall(t *testing.T, client *Client) {
wg.Wait()
}
-func InitTest(t *testing.T) (*Server, *common.URL) {
-
- hessian.RegisterPOJO(&User{})
- remoting.RegistryCodec("dubbo", &DubboTestCodec{})
-
- methods, err :=
common.ServiceMap.Register("com.ikurento.user.UserProvider", "dubbo", "", "",
&UserProvider{})
- assert.NoError(t, err)
- assert.Equal(t,
"GetBigPkg,GetUser,GetUser0,GetUser1,GetUser2,GetUser3,GetUser4,GetUser5,GetUser6",
methods)
-
- // config
- SetClientConf(ClientConfig{
- ConnectionNum: 2,
- HeartbeatPeriod: "5s",
- SessionTimeout: "20s",
- GettySessionParam: GettySessionParam{
- CompressEncoding: false,
- TcpNoDelay: true,
- TcpKeepAlive: true,
- KeepAlivePeriod: "120s",
- TcpRBufSize: 262144,
- TcpWBufSize: 65536,
- PkgWQSize: 512,
- TcpReadTimeout: "4s",
- TcpWriteTimeout: "5s",
- WaitTimeout: "1s",
- MaxMsgLen: 10240000000,
- SessionName: "client",
- },
- })
- assert.NoError(t, clientConf.CheckValidity())
- SetServerConfig(ServerConfig{
- SessionNumber: 700,
- SessionTimeout: "20s",
- GettySessionParam: GettySessionParam{
- CompressEncoding: false,
- TcpNoDelay: true,
- TcpKeepAlive: true,
- KeepAlivePeriod: "120s",
- TcpRBufSize: 262144,
- TcpWBufSize: 65536,
- PkgWQSize: 512,
- TcpReadTimeout: "1s",
- TcpWriteTimeout: "5s",
- WaitTimeout: "1s",
- MaxMsgLen: 10240000000,
- SessionName: "server",
- }})
- assert.NoError(t, srvConf.CheckValidity())
-
- url, err :=
common.NewURL("dubbo://127.0.0.1:20060/com.ikurento.user.UserProvider?anyhost=true&"
+
-
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"
+
-
"environment=dev&interface=com.ikurento.user.UserProvider&ip=127.0.0.1&methods=GetUser%2C&"
+
-
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"
+
-
"side=provider&timeout=3000×tamp=1556509797245&bean.name=UserProvider")
- assert.NoError(t, err)
- // init server
- userProvider := &UserProvider{}
- _, err = common.ServiceMap.Register("", url.Protocol, "", "0.0.1",
userProvider)
- assert.NoError(t, err)
- invoker := &proxy_factory.ProxyInvoker{
- BaseInvoker: *protocol.NewBaseInvoker(url),
- }
- handler := func(invocation *invocation.RPCInvocation)
protocol.RPCResult {
- //result := protocol.RPCResult{}
- r := invoker.Invoke(context.Background(), invocation)
- result := protocol.RPCResult{
- Err: r.Error(),
- Rest: r.Result(),
- Attrs: r.Attachments(),
- }
- return result
- }
- server := NewServer(url, handler)
- server.Start()
-
- time.Sleep(time.Second * 2)
-
- return server, url
-}
-
//////////////////////////////////
// provider
//////////////////////////////////
@@ -450,7 +374,7 @@ func (u *UserProvider) GetUser(ctx context.Context, req
[]interface{}, rsp *User
}
func (u *UserProvider) GetUser0(id string, k *User, name string) (User, error)
{
- // fix testClient_AsyncCall assertion
+ // fix testClientAsyncCall assertion
time.Sleep(1 * time.Second)
return User{Id: id, Name: name}, nil
}
diff --git a/remoting/getty/getty_server.go b/remoting/getty/getty_server.go
index 4930a6a..2921d6c 100644
--- a/remoting/getty/getty_server.go
+++ b/remoting/getty/getty_server.go
@@ -25,8 +25,11 @@ import (
import (
"github.com/apache/dubbo-getty"
+
gxsync "github.com/dubbogo/gost/sync"
+
perrors "github.com/pkg/errors"
+
"gopkg.in/yaml.v2"
)
diff --git a/remoting/getty/listener.go b/remoting/getty/listener.go
index f316b26..512a82c 100644
--- a/remoting/getty/listener.go
+++ b/remoting/getty/listener.go
@@ -26,7 +26,11 @@ import (
import (
"github.com/apache/dubbo-getty"
+
hessian "github.com/apache/dubbo-go-hessian2"
+
+ gxtime "github.com/dubbogo/gost/time"
+
perrors "github.com/pkg/errors"
)
@@ -379,7 +383,7 @@ func heartbeat(session getty.Session, timeout
time.Duration, callBack func(err e
go func() {
var err1 error
select {
- case <-getty.GetTimeWheel().After(timeout):
+ case <-gxtime.After(timeout):
err1 = errHeartbeatReadTimeout
case <-resp.Done:
err1 = resp.Err
diff --git a/remoting/getty/listener_test.go b/remoting/getty/listener_test.go
index 2700ed8..4f4a085 100644
--- a/remoting/getty/listener_test.go
+++ b/remoting/getty/listener_test.go
@@ -25,6 +25,7 @@ import (
import (
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/mocktracer"
+
"github.com/stretchr/testify/assert"
)
diff --git a/remoting/getty/opentracing.go b/remoting/getty/opentracing.go
index 4ba4fde..d0ab1bc 100644
--- a/remoting/getty/opentracing.go
+++ b/remoting/getty/opentracing.go
@@ -20,6 +20,7 @@ package getty
import (
"github.com/opentracing/opentracing-go"
)
+
import (
invocation_impl "github.com/apache/dubbo-go/protocol/invocation"
)
diff --git a/remoting/getty/readwriter.go b/remoting/getty/readwriter.go
index 61062df..6aa4fb0 100644
--- a/remoting/getty/readwriter.go
+++ b/remoting/getty/readwriter.go
@@ -24,7 +24,9 @@ import (
import (
"github.com/apache/dubbo-getty"
+
hessian "github.com/apache/dubbo-go-hessian2"
+
perrors "github.com/pkg/errors"
)
@@ -33,10 +35,6 @@ import (
"github.com/apache/dubbo-go/remoting"
)
-////////////////////////////////////////////
-// RpcClientPackageHandler
-////////////////////////////////////////////
-
// RpcClientPackageHandler Read data from server and Write data to server
type RpcClientPackageHandler struct {
client *Client
diff --git a/remoting/getty/readwriter_test.go
b/remoting/getty/readwriter_test.go
index a0ad243..eb4891d 100644
--- a/remoting/getty/readwriter_test.go
+++ b/remoting/getty/readwriter_test.go
@@ -25,6 +25,7 @@ import (
import (
hessian "github.com/apache/dubbo-go-hessian2"
+
"github.com/stretchr/testify/assert"
)
@@ -32,6 +33,7 @@ import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/proxy/proxy_factory"
+ "github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/dubbo/impl"
"github.com/apache/dubbo-go/protocol/invocation"
@@ -39,54 +41,24 @@ import (
)
func TestTCPPackageHandle(t *testing.T) {
- svr, url := getServer(t)
- client := getClient(url)
- testDecodeTCPPackage(t, svr, client)
- svr.Stop()
-}
-
-func testDecodeTCPPackage(t *testing.T, svr *Server, client *Client) {
- request := remoting.NewRequest("2.0.2")
- ap := &AdminProvider{}
- rpcInvocation := createInvocation("GetAdmin", nil, nil,
[]interface{}{[]interface{}{"1", "username"}},
- []reflect.Value{reflect.ValueOf([]interface{}{"1",
"username"}), reflect.ValueOf(ap)})
- attachment := map[string]string{
- constant.INTERFACE_KEY: "com.ikurento.user.AdminProvider",
- constant.PATH_KEY: "AdminProvider",
- constant.VERSION_KEY: "1.0.0",
- }
- setAttachment(rpcInvocation, attachment)
- request.Data = rpcInvocation
- request.Event = false
- request.TwoWay = false
+ initTestEnvironment(t)
+ adminUrl := initAdminUrl(t)
+ server := getServer(adminUrl)
+ client := getClient(adminUrl)
- pkgWriteHandler := NewRpcClientPackageHandler(client)
- pkgBytes, err := pkgWriteHandler.Write(nil, request)
- assert.NoError(t, err)
- pkgReadHandler := NewRpcServerPackageHandler(svr)
- _, pkgLen, err := pkgReadHandler.Read(nil, pkgBytes)
- assert.NoError(t, err)
- assert.Equal(t, pkgLen, len(pkgBytes))
-
- // simulate incomplete tcp package
- incompletePkgLen := len(pkgBytes) - 10
- assert.True(t, incompletePkgLen >= impl.HEADER_LENGTH, "header buffer
too short")
- incompletePkg := pkgBytes[0 : incompletePkgLen-1]
- pkg, pkgLen, err := pkgReadHandler.Read(nil, incompletePkg)
- assert.NoError(t, err)
- assert.Equal(t, pkg, nil)
- assert.Equal(t, pkgLen, 0)
+ testDecodeTCPPackage(t, server, client)
+ server.Stop()
}
-func getServer(t *testing.T) (*Server, *common.URL) {
- hessian.RegisterPOJO(&User{})
- remoting.RegistryCodec("dubbo", &DubboTestCodec{})
+//////////////////////////////////
+// before execute getty_test
+// 1. init config
+// 2. init url
+// 3. init server
+// 4. init client
+//////////////////////////////////
- methods, err :=
common.ServiceMap.Register("com.ikurento.user.AdminProvider", "dubbo", "", "",
&AdminProvider{})
- assert.NoError(t, err)
- assert.Equal(t, "GetAdmin", methods)
-
- // config
+func initTestEnvironment(t *testing.T) {
SetClientConf(ClientConfig{
ConnectionNum: 2,
HeartbeatPeriod: "5s",
@@ -98,7 +70,6 @@ func getServer(t *testing.T) (*Server, *common.URL) {
KeepAlivePeriod: "120s",
TcpRBufSize: 262144,
TcpWBufSize: 65536,
- PkgWQSize: 512,
TcpReadTimeout: "4s",
TcpWriteTimeout: "5s",
WaitTimeout: "1s",
@@ -117,7 +88,6 @@ func getServer(t *testing.T) (*Server, *common.URL) {
KeepAlivePeriod: "120s",
TcpRBufSize: 262144,
TcpWBufSize: 65536,
- PkgWQSize: 512,
TcpReadTimeout: "1s",
TcpWriteTimeout: "5s",
WaitTimeout: "1s",
@@ -125,17 +95,27 @@ func getServer(t *testing.T) (*Server, *common.URL) {
SessionName: "server",
}})
assert.NoError(t, srvConf.CheckValidity())
+}
+
+func initAdminUrl(t *testing.T) *common.URL {
+ hessian.RegisterPOJO(&User{})
+ remoting.RegistryCodec("dubbo", &DubboTestCodec{})
- url, err :=
common.NewURL("dubbo://127.0.0.1:20061/com.ikurento.user.AdminProvider?anyhost=true&"
+
+ url, err := common.NewURL("dubbo://127.0.0.1:20061/?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"
+
"environment=dev&interface=com.ikurento.user.AdminProvider&ip=127.0.0.1&methods=GetAdmin%2C&"
+
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"
+
"side=provider&timeout=3000×tamp=1556509797245&bean.name=AdminProvider")
assert.NoError(t, err)
- // init server
- adminProvider := &AdminProvider{}
- _, err = common.ServiceMap.Register("com.ikurento.user.AdminProvider",
url.Protocol, "", "0.0.1", adminProvider)
+
+ methods, err :=
common.ServiceMap.Register("com.ikurento.user.AdminProvider", url.Protocol, "",
"", &AdminProvider{})
assert.NoError(t, err)
+ assert.Equal(t, "GetAdmin", methods)
+
+ return url
+}
+
+func getServer(url *common.URL) *Server {
invoker := &proxy_factory.ProxyInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
}
@@ -154,9 +134,78 @@ func getServer(t *testing.T) (*Server, *common.URL) {
time.Sleep(time.Second * 2)
- return server, url
+ return server
}
+func getClient(url *common.URL) *Client {
+ client := NewClient(Options{
+ ConnectTimeout: config.GetConsumerConfig().ConnectTimeout,
+ })
+
+ if err := client.Connect(url); err != nil {
+ return nil
+ }
+ return client
+}
+
+//////////////////////////////////
+// test util
+//////////////////////////////////
+
+func createInvocation(methodName string, callback interface{}, reply
interface{}, arguments []interface{},
+ parameterValues []reflect.Value) *invocation.RPCInvocation {
+ return
invocation.NewRPCInvocationWithOptions(invocation.WithMethodName(methodName),
+ invocation.WithArguments(arguments),
invocation.WithReply(reply),
+ invocation.WithCallBack(callback),
invocation.WithParameterValues(parameterValues))
+}
+
+func setAttachment(invocation *invocation.RPCInvocation, attachments
map[string]string) {
+ for key, value := range attachments {
+ invocation.SetAttachments(key, value)
+ }
+}
+
+//////////////////////////////////
+// test cases
+//////////////////////////////////
+
+func testDecodeTCPPackage(t *testing.T, svr *Server, client *Client) {
+ request := remoting.NewRequest("2.0.2")
+ ap := &AdminProvider{}
+ rpcInvocation := createInvocation("GetAdmin", nil, nil,
[]interface{}{[]interface{}{"1", "username"}},
+ []reflect.Value{reflect.ValueOf([]interface{}{"1",
"username"}), reflect.ValueOf(ap)})
+ attachment := map[string]string{
+ constant.INTERFACE_KEY: "com.ikurento.user.AdminProvider",
+ constant.PATH_KEY: "AdminProvider",
+ constant.VERSION_KEY: "1.0.0",
+ }
+ setAttachment(rpcInvocation, attachment)
+ request.Data = rpcInvocation
+ request.Event = false
+ request.TwoWay = false
+
+ pkgWriteHandler := NewRpcClientPackageHandler(client)
+ pkgBytes, err := pkgWriteHandler.Write(nil, request)
+ assert.NoError(t, err)
+ pkgReadHandler := NewRpcServerPackageHandler(svr)
+ _, pkgLen, err := pkgReadHandler.Read(nil, pkgBytes)
+ assert.NoError(t, err)
+ assert.Equal(t, pkgLen, len(pkgBytes))
+
+ // simulate incomplete tcp package
+ incompletePkgLen := len(pkgBytes) - 10
+ assert.True(t, incompletePkgLen >= impl.HEADER_LENGTH, "header buffer
too short")
+ incompletePkg := pkgBytes[0 : incompletePkgLen-1]
+ pkg, pkgLen, err := pkgReadHandler.Read(nil, incompletePkg)
+ assert.NoError(t, err)
+ assert.Equal(t, pkg, nil)
+ assert.Equal(t, pkgLen, 0)
+}
+
+//////////////////////////////////
+// provider
+//////////////////////////////////
+
type AdminProvider struct {
}
diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go
index bbc9b0a..110dee6 100644
--- a/remoting/zookeeper/client.go
+++ b/remoting/zookeeper/client.go
@@ -24,6 +24,7 @@ import (
import (
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
+
perrors "github.com/pkg/errors"
)
diff --git a/remoting/zookeeper/curator_discovery/service_discovery.go
b/remoting/zookeeper/curator_discovery/service_discovery.go
index ebe784c..fee6325 100644
--- a/remoting/zookeeper/curator_discovery/service_discovery.go
+++ b/remoting/zookeeper/curator_discovery/service_discovery.go
@@ -26,7 +26,9 @@ import (
import (
"github.com/dubbogo/go-zookeeper/zk"
+
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
+
perrors "github.com/pkg/errors"
)
diff --git a/remoting/zookeeper/facade.go b/remoting/zookeeper/facade.go
index d735481..f66a2c9 100644
--- a/remoting/zookeeper/facade.go
+++ b/remoting/zookeeper/facade.go
@@ -21,9 +21,11 @@ import (
"sync"
"time"
)
+
import (
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
)
+
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index cedf6e5..d733790 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -25,10 +25,13 @@ import (
)
import (
- getty "github.com/apache/dubbo-getty"
"github.com/dubbogo/go-zookeeper/zk"
+
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
+ gxtime "github.com/dubbogo/gost/time"
+
perrors "github.com/pkg/errors"
+
uatomic "go.uber.org/atomic"
)
@@ -253,7 +256,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL,
zkPath string, listen
return
}
select {
- case
<-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)):
+ case <-gxtime.After(timeSecondDuration(failTimes *
ConnDelay)):
l.client.UnregisterEvent(zkPath, &event)
continue
case <-l.exit:
diff --git a/test/integrate/dubbo/go-client/client.yml
b/test/integrate/dubbo/go-client/client.yml
index df44a13..411dfe0 100644
--- a/test/integrate/dubbo/go-client/client.yml
+++ b/test/integrate/dubbo/go-client/client.yml
@@ -53,7 +53,6 @@ protocol_conf:
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
- pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
wait_timeout: "1s"
diff --git a/test/integrate/dubbo/go-server/server.yml
b/test/integrate/dubbo/go-server/server.yml
index 8a17297..04d5b8f 100644
--- a/test/integrate/dubbo/go-server/server.yml
+++ b/test/integrate/dubbo/go-server/server.yml
@@ -49,7 +49,6 @@ protocol_conf:
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
- pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
wait_timeout: "1s"
diff --git a/tools/cli/example/server/config/server.yml
b/tools/cli/example/server/config/server.yml
index 425eb1c..7603b15 100644
--- a/tools/cli/example/server/config/server.yml
+++ b/tools/cli/example/server/config/server.yml
@@ -50,7 +50,6 @@ protocol_conf:
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
- pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
wait_timeout: "1s"