This is an automated email from the ASF dual-hosted git repository.

baerwang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7a9b5c7f5 feat:add keepalive config for triple portocol (#2757)
7a9b5c7f5 is described below

commit 7a9b5c7f5bc8b4530a3a2f276ccb278d9fedfb14
Author: xinfan.wu <[email protected]>
AuthorDate: Fri Dec 6 14:15:33 2024 +0800

    feat:add keepalive config for triple portocol (#2757)
    
    * fix:set url to nil when destroy base invoker and change the order of 
related references to avoid panic
    
    * add comment at  BaseInvoker.Destroy()
    
    * feat:add keepalive config and pass through to http2 transport
    
    * chore:update comments for clarity
    
    * chore:update comments for clarity
    
    * feat:add keepalive parameters to dubbogo/triple and preallocate the 
config options to avoid memory copying and allocation caused by dynamic 
expansion of slices.
    
    * feat:add keepalive parameters, default interval is 10s, and timeout value 
is 20s
    
    * fix:golang lint ci error
    
    * fix:typo
    
    * refactor:add space
    
    * refactor:add space for comment
    
    * refactor:add space for comment
---
 client/action.go                  |   2 +
 client/options.go                 |  19 +++++++
 client/options_test.go            |  38 +++++++++++++
 common/constant/default.go        |   5 ++
 common/constant/key.go            |   2 +
 common/url.go                     |   2 +-
 global/reference_config.go        | 110 +++++++++++++++++++++++---------------
 go.mod                            |   2 +-
 go.sum                            |   4 +-
 protocol/triple/client.go         |   8 ++-
 protocol/triple/dubbo3_invoker.go |   5 ++
 11 files changed, 147 insertions(+), 50 deletions(-)

diff --git a/client/action.go b/client/action.go
index b5a0fafd6..cd4ebd149 100644
--- a/client/action.go
+++ b/client/action.go
@@ -127,6 +127,8 @@ func (refOpts *ReferenceOptions) refer(srv 
common.RPCService, info *ClientInfo)
                common.WithParams(refOpts.getURLMap()),
                common.WithParamsValue(constant.BeanNameKey, refOpts.id),
                common.WithParamsValue(constant.MetadataTypeKey, 
refOpts.metaDataType),
+               common.WithParamsValue(constant.KeepAliveInterval, 
ref.KeepAliveInterval),
+               common.WithParamsValue(constant.KeepAliveTimeout, 
ref.KeepAliveTimeout),
        )
        if info != nil {
                cfgURL.SetAttribute(constant.ClientInfoKey, info)
diff --git a/client/options.go b/client/options.go
index ccb81f0df..28610e24e 100644
--- a/client/options.go
+++ b/client/options.go
@@ -638,6 +638,25 @@ func WithClientClusterStrategy(strategy string) 
ClientOption {
        }
 }
 
+// If there is no other traffic on the connection, the ping will be sent, only 
works for 'tri' protocol with http2.
+// A minimum value of 10s will be used instead to invoid 'too many pings'.If 
not set, default value is 10s.
+func WithKeepAliveInterval(keepAliveInterval time.Duration) ClientOption {
+       if keepAliveInterval < constant.MinKeepAliveInterval {
+               keepAliveInterval = constant.MinKeepAliveInterval
+       }
+       return func(opts *ClientOptions) {
+               opts.overallReference.KeepAliveInterval = 
keepAliveInterval.String()
+       }
+}
+
+// WithKeepAliveTimeout is timeout after which the connection will be closed, 
only works for 'tri' protocol with http2
+// If not set, default value is 20s.
+func WithKeepAliveTimeout(keepAliveTimeout time.Duration) ClientOption {
+       return func(opts *ClientOptions) {
+               opts.overallReference.KeepAliveTimeout = 
keepAliveTimeout.String()
+       }
+}
+
 // ========== LoadBalance Strategy ==========
 
 func WithClientLoadBalanceConsistentHashing() ClientOption {
diff --git a/client/options_test.go b/client/options_test.go
index 66d7d9d0d..4983e2d23 100644
--- a/client/options_test.go
+++ b/client/options_test.go
@@ -1193,3 +1193,41 @@ func TestWithMeshProviderPort(t *testing.T) {
        }
        processReferenceOptionsInitCases(t, cases)
 }
+
+func TestWithKeepAliveConfig(t *testing.T) {
+       cases := []newClientCase{
+               {
+                       desc: "config keepalive interval with less than 10s",
+                       opts: []ClientOption{
+                               WithKeepAliveInterval(time.Second * 5), //less 
than 10s(min ping interval),should be set to 10s
+                       },
+                       verify: func(t *testing.T, cli *Client, err error) {
+                               assert.Nil(t, err)
+                               assert.Equal(t, "10s", 
cli.cliOpts.overallReference.KeepAliveInterval)
+                       },
+               },
+               {
+                       desc: "config keepalive interval with larger than 10s",
+                       opts: []ClientOption{
+                               WithKeepAliveInterval(time.Second * 20),
+                       },
+                       verify: func(t *testing.T, cli *Client, err error) {
+                               assert.Nil(t, err)
+                               assert.Equal(t, "20s", 
cli.cliOpts.overallReference.KeepAliveInterval)
+                       },
+               },
+               {
+                       desc: "config keepalive interval and timeout",
+                       opts: []ClientOption{
+                               WithKeepAliveInterval(time.Second * 20),
+                               WithKeepAliveTimeout(time.Second * 30),
+                       },
+                       verify: func(t *testing.T, cli *Client, err error) {
+                               assert.Nil(t, err)
+                               assert.Equal(t, "20s", 
cli.cliOpts.overallReference.KeepAliveInterval)
+                               assert.Equal(t, "30s", 
cli.cliOpts.overallReference.KeepAliveTimeout)
+                       },
+               },
+       }
+       processNewClientCases(t, cases)
+}
diff --git a/common/constant/default.go b/common/constant/default.go
index 1abae1d02..edd08301a 100644
--- a/common/constant/default.go
+++ b/common/constant/default.go
@@ -19,6 +19,7 @@ package constant
 
 import (
        "math"
+       "time"
 )
 
 const (
@@ -103,6 +104,10 @@ const (
 
        DefaultMaxCallRecvMsgSize = 1024 * 1024 * 4
        DefaultMaxCallSendMsgSize = math.MaxInt32
+
+       DefaultKeepAliveInterval = "10s"
+       DefaultKeepAliveTimeout  = "20s"
+       MinKeepAliveInterval     = 10 * time.Second // KeepAliveMinInterval is 
the minimum ping interval to invoid too many ping
 )
 
 const (
diff --git a/common/constant/key.go b/common/constant/key.go
index 805949a6a..192505812 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -62,6 +62,8 @@ const (
        MaxServerSendMsgSize   = "max-server-send-msg-size"
        MaxCallRecvMsgSize     = "max-call-recv-msg-size"
        MaxServerRecvMsgSize   = "max-server-recv-msg-size"
+       KeepAliveInterval      = "keep-alive-interval"
+       KeepAliveTimeout       = "keep-alive-timeout"
 )
 
 // tls constant
diff --git a/common/url.go b/common/url.go
index 6156af23d..2a5dff8da 100644
--- a/common/url.go
+++ b/common/url.go
@@ -971,7 +971,7 @@ func GetCompareURLEqualFunc() CompareURLEqualFunc {
        return compareURLEqualFunc
 }
 
-// GetParamDuration get duration if param is invalid or missing will return 3s
+// GetParamDuration get duration if param is invalid or missing default value 
will return 3s
 func (c *URL) GetParamDuration(s string, d string) time.Duration {
        if t, err := time.ParseDuration(c.GetParam(s, d)); err == nil {
                return t
diff --git a/global/reference_config.go b/global/reference_config.go
index 94b188cb9..f5eb23335 100644
--- a/global/reference_config.go
+++ b/global/reference_config.go
@@ -23,28 +23,30 @@ import (
 
 // ReferenceConfig is the configuration of service consumer
 type ReferenceConfig struct {
-       InterfaceName    string            `yaml:"interface"  
json:"interface,omitempty" property:"interface"`
-       Check            *bool             `yaml:"check"  
json:"check,omitempty" property:"check"`
-       URL              string            `yaml:"url"  json:"url,omitempty" 
property:"url"`
-       Filter           string            `yaml:"filter" 
json:"filter,omitempty" property:"filter"`
-       Protocol         string            `yaml:"protocol"  
json:"protocol,omitempty" property:"protocol"`
-       RegistryIDs      []string          `yaml:"registry-ids"  
json:"registry-ids,omitempty"  property:"registry-ids"`
-       Cluster          string            `yaml:"cluster"  
json:"cluster,omitempty" property:"cluster"`
-       Loadbalance      string            `yaml:"loadbalance"  
json:"loadbalance,omitempty" property:"loadbalance"`
-       Retries          string            `yaml:"retries"  
json:"retries,omitempty" property:"retries"`
-       Group            string            `yaml:"group"  
json:"group,omitempty" property:"group"`
-       Version          string            `yaml:"version"  
json:"version,omitempty" property:"version"`
-       Serialization    string            `yaml:"serialization" 
json:"serialization" property:"serialization"`
-       ProvidedBy       string            `yaml:"provided_by"  
json:"provided_by,omitempty" property:"provided_by"`
-       Methods          []*MethodConfig   `yaml:"methods"  
json:"methods,omitempty" property:"methods"`
-       Async            bool              `yaml:"async"  
json:"async,omitempty" property:"async"`
-       Params           map[string]string `yaml:"params"  
json:"params,omitempty" property:"params"`
-       Generic          string            `yaml:"generic"  
json:"generic,omitempty" property:"generic"`
-       Sticky           bool              `yaml:"sticky"   
json:"sticky,omitempty" property:"sticky"`
-       RequestTimeout   string            `yaml:"timeout"  
json:"timeout,omitempty" property:"timeout"`
-       ForceTag         bool              `yaml:"force.tag"  
json:"force.tag,omitempty" property:"force.tag"`
-       TracingKey       string            `yaml:"tracing-key" 
json:"tracing-key,omitempty" propertiy:"tracing-key"`
-       MeshProviderPort int               `yaml:"mesh-provider-port" 
json:"mesh-provider-port,omitempty" propertiy:"mesh-provider-port"`
+       InterfaceName     string            `yaml:"interface"  
json:"interface,omitempty" property:"interface"`
+       Check             *bool             `yaml:"check"  
json:"check,omitempty" property:"check"`
+       URL               string            `yaml:"url"  json:"url,omitempty" 
property:"url"`
+       Filter            string            `yaml:"filter" 
json:"filter,omitempty" property:"filter"`
+       Protocol          string            `yaml:"protocol"  
json:"protocol,omitempty" property:"protocol"`
+       RegistryIDs       []string          `yaml:"registry-ids"  
json:"registry-ids,omitempty"  property:"registry-ids"`
+       Cluster           string            `yaml:"cluster"  
json:"cluster,omitempty" property:"cluster"`
+       Loadbalance       string            `yaml:"loadbalance"  
json:"loadbalance,omitempty" property:"loadbalance"`
+       Retries           string            `yaml:"retries"  
json:"retries,omitempty" property:"retries"`
+       Group             string            `yaml:"group"  
json:"group,omitempty" property:"group"`
+       Version           string            `yaml:"version"  
json:"version,omitempty" property:"version"`
+       Serialization     string            `yaml:"serialization" 
json:"serialization" property:"serialization"`
+       ProvidedBy        string            `yaml:"provided_by"  
json:"provided_by,omitempty" property:"provided_by"`
+       Methods           []*MethodConfig   `yaml:"methods"  
json:"methods,omitempty" property:"methods"`
+       Async             bool              `yaml:"async"  
json:"async,omitempty" property:"async"`
+       Params            map[string]string `yaml:"params"  
json:"params,omitempty" property:"params"`
+       Generic           string            `yaml:"generic"  
json:"generic,omitempty" property:"generic"`
+       Sticky            bool              `yaml:"sticky"   
json:"sticky,omitempty" property:"sticky"`
+       RequestTimeout    string            `yaml:"timeout"  
json:"timeout,omitempty" property:"timeout"`
+       ForceTag          bool              `yaml:"force.tag"  
json:"force.tag,omitempty" property:"force.tag"`
+       TracingKey        string            `yaml:"tracing-key" 
json:"tracing-key,omitempty" propertiy:"tracing-key"`
+       MeshProviderPort  int               `yaml:"mesh-provider-port" 
json:"mesh-provider-port,omitempty" propertiy:"mesh-provider-port"`
+       KeepAliveInterval string            `yaml:"keep-alive-interval" 
json:"keep-alive-interval,omitempty" property:"keep-alive-interval"`
+       KeepAliveTimeout  string            `yaml:"keep-alive-timeout" 
json:"keep-alive-timeout,omitempty" property:"keep-alive-timeout"`
 }
 
 func DefaultReferenceConfig() *ReferenceConfig {
@@ -121,6 +123,12 @@ func (c *ReferenceConfig) GetOptions() []ReferenceOption {
        if c.MeshProviderPort != 0 {
                refOpts = append(refOpts, 
WithReference_MeshProviderPort(c.MeshProviderPort))
        }
+       if c.KeepAliveInterval != "" {
+               refOpts = append(refOpts, 
WithReference_KeepAliveInterval(c.KeepAliveInterval))
+       }
+       if c.KeepAliveTimeout != "" {
+               refOpts = append(refOpts, 
WithReference_KeepAliveTimeout(c.KeepAliveTimeout))
+       }
        return refOpts
 }
 
@@ -153,28 +161,30 @@ func (c *ReferenceConfig) Clone() *ReferenceConfig {
        }
 
        return &ReferenceConfig{
-               InterfaceName:    c.InterfaceName,
-               Check:            newCheck,
-               URL:              c.URL,
-               Filter:           c.Filter,
-               Protocol:         c.Protocol,
-               RegistryIDs:      newRegistryIDs,
-               Cluster:          c.Cluster,
-               Loadbalance:      c.Loadbalance,
-               Retries:          c.Retries,
-               Group:            c.Group,
-               Version:          c.Version,
-               Serialization:    c.Serialization,
-               ProvidedBy:       c.ProvidedBy,
-               Methods:          newMethods,
-               Async:            c.Async,
-               Params:           newParams,
-               Generic:          c.Generic,
-               Sticky:           c.Sticky,
-               RequestTimeout:   c.RequestTimeout,
-               ForceTag:         c.ForceTag,
-               TracingKey:       c.TracingKey,
-               MeshProviderPort: c.MeshProviderPort,
+               InterfaceName:     c.InterfaceName,
+               Check:             newCheck,
+               URL:               c.URL,
+               Filter:            c.Filter,
+               Protocol:          c.Protocol,
+               RegistryIDs:       newRegistryIDs,
+               Cluster:           c.Cluster,
+               Loadbalance:       c.Loadbalance,
+               Retries:           c.Retries,
+               Group:             c.Group,
+               Version:           c.Version,
+               Serialization:     c.Serialization,
+               ProvidedBy:        c.ProvidedBy,
+               Methods:           newMethods,
+               Async:             c.Async,
+               Params:            newParams,
+               Generic:           c.Generic,
+               Sticky:            c.Sticky,
+               RequestTimeout:    c.RequestTimeout,
+               ForceTag:          c.ForceTag,
+               TracingKey:        c.TracingKey,
+               MeshProviderPort:  c.MeshProviderPort,
+               KeepAliveInterval: c.KeepAliveInterval,
+               KeepAliveTimeout:  c.KeepAliveTimeout,
        }
 }
 
@@ -307,3 +317,15 @@ func WithReference_MeshProviderPort(port int) 
ReferenceOption {
                cfg.MeshProviderPort = port
        }
 }
+
+func WithReference_KeepAliveInterval(interval string) ReferenceOption {
+       return func(cfg *ReferenceConfig) {
+               cfg.KeepAliveInterval = interval
+       }
+}
+
+func WithReference_KeepAliveTimeout(timeout string) ReferenceOption {
+       return func(cfg *ReferenceConfig) {
+               cfg.KeepAliveTimeout = timeout
+       }
+}
diff --git a/go.mod b/go.mod
index 119c2206c..78d5b41e4 100644
--- a/go.mod
+++ b/go.mod
@@ -18,7 +18,7 @@ require (
        github.com/dubbogo/go-zookeeper v1.0.4-0.20211212162352-f9d2183d89d5
        github.com/dubbogo/gost v1.14.0
        github.com/dubbogo/grpc-go v1.42.10
-       github.com/dubbogo/triple v1.2.2-rc3
+       github.com/dubbogo/triple v1.2.2-rc4
        github.com/dustin/go-humanize v1.0.1
        github.com/emicklei/go-restful/v3 v3.10.1
        github.com/envoyproxy/go-control-plane v0.11.0
diff --git a/go.sum b/go.sum
index 59d6fec87..e4be3bb8e 100644
--- a/go.sum
+++ b/go.sum
@@ -192,8 +192,8 @@ github.com/dubbogo/grpc-go v1.42.10/go.mod 
h1:JMkPt1mIHL96GAFeYsMoMjew6f1ROKycik
 github.com/dubbogo/jsonparser v1.0.1/go.mod 
h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
 github.com/dubbogo/net v0.0.4/go.mod 
h1:1CGOnM7X3he+qgGNqjeADuE5vKZQx/eMSeUkpU3ujIc=
 github.com/dubbogo/triple v1.0.9/go.mod 
h1:1t9me4j4CTvNDcsMZy6/OGarbRyAUSY0tFXGXHCp7Iw=
-github.com/dubbogo/triple v1.2.2-rc3 
h1:9rxLqru35MmJkypCHJMiZb1VzwH+zmbPBend9Cq+VOI=
-github.com/dubbogo/triple v1.2.2-rc3/go.mod 
h1:9pgEahtmsY/avYJp3dzUQE8CMMVe1NtGBmUhfICKLJk=
+github.com/dubbogo/triple v1.2.2-rc4 
h1:zL15Fb6W/yNAFQve5eqpTOEWWD9dpTFq78mdeSKc2pk=
+github.com/dubbogo/triple v1.2.2-rc4/go.mod 
h1:9pgEahtmsY/avYJp3dzUQE8CMMVe1NtGBmUhfICKLJk=
 github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod 
h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
 github.com/dustin/go-humanize v1.0.0/go.mod 
h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
 github.com/dustin/go-humanize v1.0.1 
h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
diff --git a/protocol/triple/client.go b/protocol/triple/client.go
index 66a3e5331..eee0c2b41 100644
--- a/protocol/triple/client.go
+++ b/protocol/triple/client.go
@@ -131,7 +131,9 @@ func newClientManager(url *common.URL) (*clientManager, 
error) {
                maxCallSendMsgSize = int(sendMsgSize)
        }
        cliOpts = append(cliOpts, tri.WithSendMaxBytes(maxCallSendMsgSize))
-
+       // set keepalive interval and keepalive timeout
+       keepAliveInterval := url.GetParamDuration(constant.KeepAliveInterval, 
constant.DefaultKeepAliveInterval)
+       keepAliveTimeout := url.GetParamDuration(constant.KeepAliveTimeout, 
constant.DefaultKeepAliveTimeout)
        var isIDL bool
        // set serialization
        serialization := url.GetParam(constant.SerializationKey, 
constant.ProtobufSerialization)
@@ -182,7 +184,9 @@ func newClientManager(url *common.URL) (*clientManager, 
error) {
                                DialTLSContext: func(_ context.Context, 
network, addr string, _ *tls.Config) (net.Conn, error) {
                                        return net.Dial(network, addr)
                                },
-                               AllowHTTP: true,
+                               AllowHTTP:       true,
+                               ReadIdleTimeout: keepAliveInterval,
+                               PingTimeout:     keepAliveTimeout,
                        }
                }
        default:
diff --git a/protocol/triple/dubbo3_invoker.go 
b/protocol/triple/dubbo3_invoker.go
index 0aa0b4f71..15a245abd 100644
--- a/protocol/triple/dubbo3_invoker.go
+++ b/protocol/triple/dubbo3_invoker.go
@@ -96,6 +96,11 @@ func NewDubbo3Invoker(url *common.URL) (*DubboInvoker, 
error) {
        }
        opts = append(opts, 
triConfig.WithGRPCMaxCallRecvMessageSize(maxCallRecvMsgSize))
        opts = append(opts, 
triConfig.WithGRPCMaxCallSendMessageSize(maxCallSendMsgSize))
+       // grpc keepalive config
+       keepAliveInterval := url.GetParamDuration(constant.KeepAliveInterval, 
constant.DefaultKeepAliveInterval)
+       keepAliveTimeout := url.GetParamDuration(constant.KeepAliveTimeout, 
constant.DefaultKeepAliveTimeout)
+       opts = append(opts, 
triConfig.WithGRPCKeepAliveTimeInterval(keepAliveInterval))
+       opts = append(opts, 
triConfig.WithGRPCKeepAliveTimeout(keepAliveTimeout))
 
        tracingKey := url.GetParam(constant.TracingConfigKey, "")
        if tracingKey != "" {

Reply via email to