This is an automated email from the ASF dual-hosted git repository.
baerwang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/develop by this push:
new 7a9b5c7f5 feat:add keepalive config for triple portocol (#2757)
7a9b5c7f5 is described below
commit 7a9b5c7f5bc8b4530a3a2f276ccb278d9fedfb14
Author: xinfan.wu <[email protected]>
AuthorDate: Fri Dec 6 14:15:33 2024 +0800
feat:add keepalive config for triple portocol (#2757)
* fix:set url to nil when destroy base invoker and change the order of
related references to avoid panic
* add comment at BaseInvoker.Destroy()
* feat:add keepalive config and pass through to http2 transport
* chore:update comments for clarity
* chore:update comments for clarity
* feat:add keepalive parameters to dubbogo/triple and preallocate the
config options to avoid memory copying and allocation caused by dynamic
expansion of slices.
* feat:add keepalive parameters, default interval is 10s, and timeout value
is 20s
* fix:golang lint ci error
* fix:typo
* refactor:add space
* refactor:add space for comment
* refactor:add space for comment
---
client/action.go | 2 +
client/options.go | 19 +++++++
client/options_test.go | 38 +++++++++++++
common/constant/default.go | 5 ++
common/constant/key.go | 2 +
common/url.go | 2 +-
global/reference_config.go | 110 +++++++++++++++++++++++---------------
go.mod | 2 +-
go.sum | 4 +-
protocol/triple/client.go | 8 ++-
protocol/triple/dubbo3_invoker.go | 5 ++
11 files changed, 147 insertions(+), 50 deletions(-)
diff --git a/client/action.go b/client/action.go
index b5a0fafd6..cd4ebd149 100644
--- a/client/action.go
+++ b/client/action.go
@@ -127,6 +127,8 @@ func (refOpts *ReferenceOptions) refer(srv
common.RPCService, info *ClientInfo)
common.WithParams(refOpts.getURLMap()),
common.WithParamsValue(constant.BeanNameKey, refOpts.id),
common.WithParamsValue(constant.MetadataTypeKey,
refOpts.metaDataType),
+ common.WithParamsValue(constant.KeepAliveInterval,
ref.KeepAliveInterval),
+ common.WithParamsValue(constant.KeepAliveTimeout,
ref.KeepAliveTimeout),
)
if info != nil {
cfgURL.SetAttribute(constant.ClientInfoKey, info)
diff --git a/client/options.go b/client/options.go
index ccb81f0df..28610e24e 100644
--- a/client/options.go
+++ b/client/options.go
@@ -638,6 +638,25 @@ func WithClientClusterStrategy(strategy string)
ClientOption {
}
}
+// If there is no other traffic on the connection, the ping will be sent, only
works for 'tri' protocol with http2.
+// A minimum value of 10s will be used instead to invoid 'too many pings'.If
not set, default value is 10s.
+func WithKeepAliveInterval(keepAliveInterval time.Duration) ClientOption {
+ if keepAliveInterval < constant.MinKeepAliveInterval {
+ keepAliveInterval = constant.MinKeepAliveInterval
+ }
+ return func(opts *ClientOptions) {
+ opts.overallReference.KeepAliveInterval =
keepAliveInterval.String()
+ }
+}
+
+// WithKeepAliveTimeout is timeout after which the connection will be closed,
only works for 'tri' protocol with http2
+// If not set, default value is 20s.
+func WithKeepAliveTimeout(keepAliveTimeout time.Duration) ClientOption {
+ return func(opts *ClientOptions) {
+ opts.overallReference.KeepAliveTimeout =
keepAliveTimeout.String()
+ }
+}
+
// ========== LoadBalance Strategy ==========
func WithClientLoadBalanceConsistentHashing() ClientOption {
diff --git a/client/options_test.go b/client/options_test.go
index 66d7d9d0d..4983e2d23 100644
--- a/client/options_test.go
+++ b/client/options_test.go
@@ -1193,3 +1193,41 @@ func TestWithMeshProviderPort(t *testing.T) {
}
processReferenceOptionsInitCases(t, cases)
}
+
+func TestWithKeepAliveConfig(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config keepalive interval with less than 10s",
+ opts: []ClientOption{
+ WithKeepAliveInterval(time.Second * 5), //less
than 10s(min ping interval),should be set to 10s
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "10s",
cli.cliOpts.overallReference.KeepAliveInterval)
+ },
+ },
+ {
+ desc: "config keepalive interval with larger than 10s",
+ opts: []ClientOption{
+ WithKeepAliveInterval(time.Second * 20),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "20s",
cli.cliOpts.overallReference.KeepAliveInterval)
+ },
+ },
+ {
+ desc: "config keepalive interval and timeout",
+ opts: []ClientOption{
+ WithKeepAliveInterval(time.Second * 20),
+ WithKeepAliveTimeout(time.Second * 30),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "20s",
cli.cliOpts.overallReference.KeepAliveInterval)
+ assert.Equal(t, "30s",
cli.cliOpts.overallReference.KeepAliveTimeout)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
diff --git a/common/constant/default.go b/common/constant/default.go
index 1abae1d02..edd08301a 100644
--- a/common/constant/default.go
+++ b/common/constant/default.go
@@ -19,6 +19,7 @@ package constant
import (
"math"
+ "time"
)
const (
@@ -103,6 +104,10 @@ const (
DefaultMaxCallRecvMsgSize = 1024 * 1024 * 4
DefaultMaxCallSendMsgSize = math.MaxInt32
+
+ DefaultKeepAliveInterval = "10s"
+ DefaultKeepAliveTimeout = "20s"
+ MinKeepAliveInterval = 10 * time.Second // KeepAliveMinInterval is
the minimum ping interval to invoid too many ping
)
const (
diff --git a/common/constant/key.go b/common/constant/key.go
index 805949a6a..192505812 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -62,6 +62,8 @@ const (
MaxServerSendMsgSize = "max-server-send-msg-size"
MaxCallRecvMsgSize = "max-call-recv-msg-size"
MaxServerRecvMsgSize = "max-server-recv-msg-size"
+ KeepAliveInterval = "keep-alive-interval"
+ KeepAliveTimeout = "keep-alive-timeout"
)
// tls constant
diff --git a/common/url.go b/common/url.go
index 6156af23d..2a5dff8da 100644
--- a/common/url.go
+++ b/common/url.go
@@ -971,7 +971,7 @@ func GetCompareURLEqualFunc() CompareURLEqualFunc {
return compareURLEqualFunc
}
-// GetParamDuration get duration if param is invalid or missing will return 3s
+// GetParamDuration get duration if param is invalid or missing default value
will return 3s
func (c *URL) GetParamDuration(s string, d string) time.Duration {
if t, err := time.ParseDuration(c.GetParam(s, d)); err == nil {
return t
diff --git a/global/reference_config.go b/global/reference_config.go
index 94b188cb9..f5eb23335 100644
--- a/global/reference_config.go
+++ b/global/reference_config.go
@@ -23,28 +23,30 @@ import (
// ReferenceConfig is the configuration of service consumer
type ReferenceConfig struct {
- InterfaceName string `yaml:"interface"
json:"interface,omitempty" property:"interface"`
- Check *bool `yaml:"check"
json:"check,omitempty" property:"check"`
- URL string `yaml:"url" json:"url,omitempty"
property:"url"`
- Filter string `yaml:"filter"
json:"filter,omitempty" property:"filter"`
- Protocol string `yaml:"protocol"
json:"protocol,omitempty" property:"protocol"`
- RegistryIDs []string `yaml:"registry-ids"
json:"registry-ids,omitempty" property:"registry-ids"`
- Cluster string `yaml:"cluster"
json:"cluster,omitempty" property:"cluster"`
- Loadbalance string `yaml:"loadbalance"
json:"loadbalance,omitempty" property:"loadbalance"`
- Retries string `yaml:"retries"
json:"retries,omitempty" property:"retries"`
- Group string `yaml:"group"
json:"group,omitempty" property:"group"`
- Version string `yaml:"version"
json:"version,omitempty" property:"version"`
- Serialization string `yaml:"serialization"
json:"serialization" property:"serialization"`
- ProvidedBy string `yaml:"provided_by"
json:"provided_by,omitempty" property:"provided_by"`
- Methods []*MethodConfig `yaml:"methods"
json:"methods,omitempty" property:"methods"`
- Async bool `yaml:"async"
json:"async,omitempty" property:"async"`
- Params map[string]string `yaml:"params"
json:"params,omitempty" property:"params"`
- Generic string `yaml:"generic"
json:"generic,omitempty" property:"generic"`
- Sticky bool `yaml:"sticky"
json:"sticky,omitempty" property:"sticky"`
- RequestTimeout string `yaml:"timeout"
json:"timeout,omitempty" property:"timeout"`
- ForceTag bool `yaml:"force.tag"
json:"force.tag,omitempty" property:"force.tag"`
- TracingKey string `yaml:"tracing-key"
json:"tracing-key,omitempty" propertiy:"tracing-key"`
- MeshProviderPort int `yaml:"mesh-provider-port"
json:"mesh-provider-port,omitempty" propertiy:"mesh-provider-port"`
+ InterfaceName string `yaml:"interface"
json:"interface,omitempty" property:"interface"`
+ Check *bool `yaml:"check"
json:"check,omitempty" property:"check"`
+ URL string `yaml:"url" json:"url,omitempty"
property:"url"`
+ Filter string `yaml:"filter"
json:"filter,omitempty" property:"filter"`
+ Protocol string `yaml:"protocol"
json:"protocol,omitempty" property:"protocol"`
+ RegistryIDs []string `yaml:"registry-ids"
json:"registry-ids,omitempty" property:"registry-ids"`
+ Cluster string `yaml:"cluster"
json:"cluster,omitempty" property:"cluster"`
+ Loadbalance string `yaml:"loadbalance"
json:"loadbalance,omitempty" property:"loadbalance"`
+ Retries string `yaml:"retries"
json:"retries,omitempty" property:"retries"`
+ Group string `yaml:"group"
json:"group,omitempty" property:"group"`
+ Version string `yaml:"version"
json:"version,omitempty" property:"version"`
+ Serialization string `yaml:"serialization"
json:"serialization" property:"serialization"`
+ ProvidedBy string `yaml:"provided_by"
json:"provided_by,omitempty" property:"provided_by"`
+ Methods []*MethodConfig `yaml:"methods"
json:"methods,omitempty" property:"methods"`
+ Async bool `yaml:"async"
json:"async,omitempty" property:"async"`
+ Params map[string]string `yaml:"params"
json:"params,omitempty" property:"params"`
+ Generic string `yaml:"generic"
json:"generic,omitempty" property:"generic"`
+ Sticky bool `yaml:"sticky"
json:"sticky,omitempty" property:"sticky"`
+ RequestTimeout string `yaml:"timeout"
json:"timeout,omitempty" property:"timeout"`
+ ForceTag bool `yaml:"force.tag"
json:"force.tag,omitempty" property:"force.tag"`
+ TracingKey string `yaml:"tracing-key"
json:"tracing-key,omitempty" propertiy:"tracing-key"`
+ MeshProviderPort int `yaml:"mesh-provider-port"
json:"mesh-provider-port,omitempty" propertiy:"mesh-provider-port"`
+ KeepAliveInterval string `yaml:"keep-alive-interval"
json:"keep-alive-interval,omitempty" property:"keep-alive-interval"`
+ KeepAliveTimeout string `yaml:"keep-alive-timeout"
json:"keep-alive-timeout,omitempty" property:"keep-alive-timeout"`
}
func DefaultReferenceConfig() *ReferenceConfig {
@@ -121,6 +123,12 @@ func (c *ReferenceConfig) GetOptions() []ReferenceOption {
if c.MeshProviderPort != 0 {
refOpts = append(refOpts,
WithReference_MeshProviderPort(c.MeshProviderPort))
}
+ if c.KeepAliveInterval != "" {
+ refOpts = append(refOpts,
WithReference_KeepAliveInterval(c.KeepAliveInterval))
+ }
+ if c.KeepAliveTimeout != "" {
+ refOpts = append(refOpts,
WithReference_KeepAliveTimeout(c.KeepAliveTimeout))
+ }
return refOpts
}
@@ -153,28 +161,30 @@ func (c *ReferenceConfig) Clone() *ReferenceConfig {
}
return &ReferenceConfig{
- InterfaceName: c.InterfaceName,
- Check: newCheck,
- URL: c.URL,
- Filter: c.Filter,
- Protocol: c.Protocol,
- RegistryIDs: newRegistryIDs,
- Cluster: c.Cluster,
- Loadbalance: c.Loadbalance,
- Retries: c.Retries,
- Group: c.Group,
- Version: c.Version,
- Serialization: c.Serialization,
- ProvidedBy: c.ProvidedBy,
- Methods: newMethods,
- Async: c.Async,
- Params: newParams,
- Generic: c.Generic,
- Sticky: c.Sticky,
- RequestTimeout: c.RequestTimeout,
- ForceTag: c.ForceTag,
- TracingKey: c.TracingKey,
- MeshProviderPort: c.MeshProviderPort,
+ InterfaceName: c.InterfaceName,
+ Check: newCheck,
+ URL: c.URL,
+ Filter: c.Filter,
+ Protocol: c.Protocol,
+ RegistryIDs: newRegistryIDs,
+ Cluster: c.Cluster,
+ Loadbalance: c.Loadbalance,
+ Retries: c.Retries,
+ Group: c.Group,
+ Version: c.Version,
+ Serialization: c.Serialization,
+ ProvidedBy: c.ProvidedBy,
+ Methods: newMethods,
+ Async: c.Async,
+ Params: newParams,
+ Generic: c.Generic,
+ Sticky: c.Sticky,
+ RequestTimeout: c.RequestTimeout,
+ ForceTag: c.ForceTag,
+ TracingKey: c.TracingKey,
+ MeshProviderPort: c.MeshProviderPort,
+ KeepAliveInterval: c.KeepAliveInterval,
+ KeepAliveTimeout: c.KeepAliveTimeout,
}
}
@@ -307,3 +317,15 @@ func WithReference_MeshProviderPort(port int)
ReferenceOption {
cfg.MeshProviderPort = port
}
}
+
+func WithReference_KeepAliveInterval(interval string) ReferenceOption {
+ return func(cfg *ReferenceConfig) {
+ cfg.KeepAliveInterval = interval
+ }
+}
+
+func WithReference_KeepAliveTimeout(timeout string) ReferenceOption {
+ return func(cfg *ReferenceConfig) {
+ cfg.KeepAliveTimeout = timeout
+ }
+}
diff --git a/go.mod b/go.mod
index 119c2206c..78d5b41e4 100644
--- a/go.mod
+++ b/go.mod
@@ -18,7 +18,7 @@ require (
github.com/dubbogo/go-zookeeper v1.0.4-0.20211212162352-f9d2183d89d5
github.com/dubbogo/gost v1.14.0
github.com/dubbogo/grpc-go v1.42.10
- github.com/dubbogo/triple v1.2.2-rc3
+ github.com/dubbogo/triple v1.2.2-rc4
github.com/dustin/go-humanize v1.0.1
github.com/emicklei/go-restful/v3 v3.10.1
github.com/envoyproxy/go-control-plane v0.11.0
diff --git a/go.sum b/go.sum
index 59d6fec87..e4be3bb8e 100644
--- a/go.sum
+++ b/go.sum
@@ -192,8 +192,8 @@ github.com/dubbogo/grpc-go v1.42.10/go.mod
h1:JMkPt1mIHL96GAFeYsMoMjew6f1ROKycik
github.com/dubbogo/jsonparser v1.0.1/go.mod
h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
github.com/dubbogo/net v0.0.4/go.mod
h1:1CGOnM7X3he+qgGNqjeADuE5vKZQx/eMSeUkpU3ujIc=
github.com/dubbogo/triple v1.0.9/go.mod
h1:1t9me4j4CTvNDcsMZy6/OGarbRyAUSY0tFXGXHCp7Iw=
-github.com/dubbogo/triple v1.2.2-rc3
h1:9rxLqru35MmJkypCHJMiZb1VzwH+zmbPBend9Cq+VOI=
-github.com/dubbogo/triple v1.2.2-rc3/go.mod
h1:9pgEahtmsY/avYJp3dzUQE8CMMVe1NtGBmUhfICKLJk=
+github.com/dubbogo/triple v1.2.2-rc4
h1:zL15Fb6W/yNAFQve5eqpTOEWWD9dpTFq78mdeSKc2pk=
+github.com/dubbogo/triple v1.2.2-rc4/go.mod
h1:9pgEahtmsY/avYJp3dzUQE8CMMVe1NtGBmUhfICKLJk=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod
h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0/go.mod
h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1
h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
diff --git a/protocol/triple/client.go b/protocol/triple/client.go
index 66a3e5331..eee0c2b41 100644
--- a/protocol/triple/client.go
+++ b/protocol/triple/client.go
@@ -131,7 +131,9 @@ func newClientManager(url *common.URL) (*clientManager,
error) {
maxCallSendMsgSize = int(sendMsgSize)
}
cliOpts = append(cliOpts, tri.WithSendMaxBytes(maxCallSendMsgSize))
-
+ // set keepalive interval and keepalive timeout
+ keepAliveInterval := url.GetParamDuration(constant.KeepAliveInterval,
constant.DefaultKeepAliveInterval)
+ keepAliveTimeout := url.GetParamDuration(constant.KeepAliveTimeout,
constant.DefaultKeepAliveTimeout)
var isIDL bool
// set serialization
serialization := url.GetParam(constant.SerializationKey,
constant.ProtobufSerialization)
@@ -182,7 +184,9 @@ func newClientManager(url *common.URL) (*clientManager,
error) {
DialTLSContext: func(_ context.Context,
network, addr string, _ *tls.Config) (net.Conn, error) {
return net.Dial(network, addr)
},
- AllowHTTP: true,
+ AllowHTTP: true,
+ ReadIdleTimeout: keepAliveInterval,
+ PingTimeout: keepAliveTimeout,
}
}
default:
diff --git a/protocol/triple/dubbo3_invoker.go
b/protocol/triple/dubbo3_invoker.go
index 0aa0b4f71..15a245abd 100644
--- a/protocol/triple/dubbo3_invoker.go
+++ b/protocol/triple/dubbo3_invoker.go
@@ -96,6 +96,11 @@ func NewDubbo3Invoker(url *common.URL) (*DubboInvoker,
error) {
}
opts = append(opts,
triConfig.WithGRPCMaxCallRecvMessageSize(maxCallRecvMsgSize))
opts = append(opts,
triConfig.WithGRPCMaxCallSendMessageSize(maxCallSendMsgSize))
+ // grpc keepalive config
+ keepAliveInterval := url.GetParamDuration(constant.KeepAliveInterval,
constant.DefaultKeepAliveInterval)
+ keepAliveTimeout := url.GetParamDuration(constant.KeepAliveTimeout,
constant.DefaultKeepAliveTimeout)
+ opts = append(opts,
triConfig.WithGRPCKeepAliveTimeInterval(keepAliveInterval))
+ opts = append(opts,
triConfig.WithGRPCKeepAliveTimeout(keepAliveTimeout))
tracingKey := url.GetParam(constant.TracingConfigKey, "")
if tracingKey != "" {