This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 1.5
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/1.5 by this push:
     new 22ec982  Fix: 1526 (#1527)
22ec982 is described below

commit 22ec982bc6240ff8c5aa97d3d634f4473fa2173a
Author: ChangedenChan <[email protected]>
AuthorDate: Fri Oct 22 22:54:10 2021 +0800

    Fix: 1526 (#1527)
    
    * 支持配置protocol.payload
    
    * dubbo consumer支持配置protocol.payload
    
    * 格式化代码
    
    * 格式化代码
    
    * rename ProtocolConfig.Ip to ProtocolConfig.IP
    
    * rollback rename ProtocolConfig.Ip
---
 common/constant/default.go       | 13 ++++++++++++-
 common/constant/key.go           |  5 +++++
 config/config_loader.go          |  1 +
 config/consumer_config.go        |  1 +
 config/protocol_config.go        |  7 ++++---
 config/reference_config.go       |  9 +++++++++
 protocol/dubbo/dubbo_codec.go    |  1 +
 protocol/dubbo/dubbo_protocol.go |  3 +++
 protocol/dubbo/impl/codec.go     |  8 ++++++--
 protocol/dubbo/impl/package.go   |  1 +
 remoting/exchange.go             |  7 ++++---
 remoting/getty/getty_client.go   |  2 ++
 12 files changed, 49 insertions(+), 9 deletions(-)

diff --git a/common/constant/default.go b/common/constant/default.go
index a6790f9..1ea9e1c 100644
--- a/common/constant/default.go
+++ b/common/constant/default.go
@@ -17,7 +17,10 @@
 
 package constant
 
-import "time"
+import (
+       "strconv"
+       "time"
+)
 
 const (
        DUBBO             = "dubbo"
@@ -90,3 +93,11 @@ const (
 const (
        SERVICE_DISCOVERY_DEFAULT_GROUP = "DEFAULT_GROUP"
 )
+
+const (
+       DefaultProtocolPayload = 8388608
+)
+
+var (
+       DefaultProtocolPayloadStr = strconv.Itoa(DefaultProtocolPayload)
+)
diff --git a/common/constant/key.go b/common/constant/key.go
index bd161ab..d8a66c1 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -372,3 +372,8 @@ const (
        AfterAllServicesListenCompleteFunctionName    = 
"AfterAllServicesListenComplete"
        BeforeShutdownFunctionName                    = "BeforeShutdown"
 )
+
+// protocol config
+const (
+       ProtocolPayload = "protocol.payload"
+)
diff --git a/config/config_loader.go b/config/config_loader.go
index d9288f9..741137e 100644
--- a/config/config_loader.go
+++ b/config/config_loader.go
@@ -139,6 +139,7 @@ func loadConsumerConfig() {
 
        checkRegistries(consumerConfig.Registries, consumerConfig.Registry)
        for key, ref := range consumerConfig.References {
+               ref.Protocols = consumerConfig.Protocols
                if ref.Generic {
                        genericService := NewGenericService(key)
                        SetConsumerService(genericService)
diff --git a/config/consumer_config.go b/config/consumer_config.go
index c50c2a7..de05224 100644
--- a/config/consumer_config.go
+++ b/config/consumer_config.go
@@ -59,6 +59,7 @@ type ConsumerConfig struct {
        Check           *bool  `yaml:"check"  json:"check,omitempty" 
property:"check"`
 
        References     map[string]*ReferenceConfig `yaml:"references" 
json:"references,omitempty" property:"references"`
+       Protocols      map[string]*ProtocolConfig  `yaml:"protocols" 
json:"protocols,omitempty" property:"protocols"`
        ProtocolConf   interface{}                 `yaml:"protocol_conf" 
json:"protocol_conf,omitempty" property:"protocol_conf"`
        FilterConf     interface{}                 `yaml:"filter_conf" 
json:"filter_conf,omitempty" property:"filter_conf"`
        ShutdownConfig *ShutdownConfig             `yaml:"shutdown_conf" 
json:"shutdown_conf,omitempty" property:"shutdown_conf"`
diff --git a/config/protocol_config.go b/config/protocol_config.go
index cee5b7a..f83bece 100644
--- a/config/protocol_config.go
+++ b/config/protocol_config.go
@@ -27,9 +27,10 @@ import (
 
 // ProtocolConfig is protocol configuration
 type ProtocolConfig struct {
-       Name string `required:"true" yaml:"name"  json:"name,omitempty" 
property:"name"`
-       Ip   string `required:"true" yaml:"ip"  json:"ip,omitempty" 
property:"ip"`
-       Port string `required:"true" yaml:"port"  json:"port,omitempty" 
property:"port"`
+       Name    string `required:"true" yaml:"name"  json:"name,omitempty" 
property:"name"`
+       Ip      string `required:"true" yaml:"ip"  json:"ip,omitempty" 
property:"ip"`
+       Port    string `required:"true" yaml:"port"  json:"port,omitempty" 
property:"port"`
+       Payload int    `yaml:"payload" json:"payload,omitempty" 
property:"payload"`
 }
 
 // nolint
diff --git a/config/reference_config.go b/config/reference_config.go
index 97ac688..48198cf 100644
--- a/config/reference_config.go
+++ b/config/reference_config.go
@@ -66,6 +66,7 @@ type ReferenceConfig struct {
        Sticky         bool   `yaml:"sticky"   json:"sticky,omitempty" 
property:"sticky"`
        RequestTimeout string `yaml:"timeout"  json:"timeout,omitempty" 
property:"timeout"`
        ForceTag       bool   `yaml:"force.tag"  json:"force.tag,omitempty" 
property:"force.tag"`
+       Protocols      map[string]*ProtocolConfig
 }
 
 // nolint
@@ -103,6 +104,14 @@ func (c *ReferenceConfig) Refer(_ interface{}) {
        }
        c.loadProcessConfig(cfgURL, constant.HookEventBeforeReferenceConnect, 
nil)
        c.postProcessConfig(cfgURL)
+       protocolConfigs := loadProtocol(c.Protocol, c.Protocols)
+       payload := constant.DefaultProtocolPayload
+       if len(protocolConfigs) > 0 {
+               if pl := protocolConfigs[0].Payload; pl > 0 {
+                       payload = pl
+               }
+       }
+       cfgURL.AddParam(constant.ProtocolPayload, strconv.Itoa(payload))
        if c.URL != "" {
                // 1. user specified URL, could be peer-to-peer address, or 
register center's address.
                urlStrings := gxstrings.RegSplit(c.URL, "\\s*[;]+\\s*")
diff --git a/protocol/dubbo/dubbo_codec.go b/protocol/dubbo/dubbo_codec.go
index 21376c3..0353424 100644
--- a/protocol/dubbo/dubbo_codec.go
+++ b/protocol/dubbo/dubbo_codec.go
@@ -96,6 +96,7 @@ func (c *DubboCodec) EncodeRequest(request *remoting.Request) 
(*bytes.Buffer, er
                Body:    impl.NewRequestPayload(invocation.Arguments(), 
invocation.Attachments()),
                Err:     nil,
                Codec:   impl.NewDubboCodec(nil),
+               Payload: request.Payload,
        }
 
        if err := impl.LoadSerializer(pkg); err != nil {
diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go
index 94afbb1..585c003 100644
--- a/protocol/dubbo/dubbo_protocol.go
+++ b/protocol/dubbo/dubbo_protocol.go
@@ -20,6 +20,7 @@ package dubbo
 import (
        "context"
        "fmt"
+       "strconv"
        "sync"
        "time"
 )
@@ -198,9 +199,11 @@ func getExchangeClient(url *common.URL) 
*remoting.ExchangeClient {
                                return
                        }
                        // new ExchangeClient
+                       payload, _ := 
strconv.Atoi(url.GetParam(constant.ProtocolPayload, 
constant.DefaultProtocolPayloadStr))
                        exchangeClientTmp = remoting.NewExchangeClient(url, 
getty.NewClient(getty.Options{
                                ConnectTimeout: 
config.GetConsumerConfig().ConnectTimeout,
                                RequestTimeout: 
config.GetConsumerConfig().RequestTimeout,
+                               Payload:        payload,
                        }), config.GetConsumerConfig().ConnectTimeout, false)
                        // input store
                        if exchangeClientTmp != nil {
diff --git a/protocol/dubbo/impl/codec.go b/protocol/dubbo/impl/codec.go
index 6c9816f..1b3cb36 100644
--- a/protocol/dubbo/impl/codec.go
+++ b/protocol/dubbo/impl/codec.go
@@ -219,6 +219,10 @@ func packRequest(p DubboPackage, serializer Serializer) 
([]byte, error) {
        //////////////////////////////////////////
        // body
        //////////////////////////////////////////
+       payload := DEFAULT_LEN
+       if p.Payload > 0 {
+               payload = p.Payload
+       }
        if p.IsHeartBeat() {
                byteArray = append(byteArray, byte('N'))
                pkgLen = 1
@@ -228,8 +232,8 @@ func packRequest(p DubboPackage, serializer Serializer) 
([]byte, error) {
                        return nil, err
                }
                pkgLen = len(body)
-               if pkgLen > int(DEFAULT_LEN) { // 8M
-                       return nil, perrors.Errorf("Data length %d too large, 
max payload %d", pkgLen, DEFAULT_LEN)
+               if pkgLen > payload { // 8M
+                       return nil, perrors.Errorf("Data length %d too large, 
max payload %d", pkgLen, payload)
                }
                byteArray = append(byteArray, body...)
        }
diff --git a/protocol/dubbo/impl/package.go b/protocol/dubbo/impl/package.go
index 6f6d2ea..fe329c9 100644
--- a/protocol/dubbo/impl/package.go
+++ b/protocol/dubbo/impl/package.go
@@ -65,6 +65,7 @@ type DubboPackage struct {
        Body    interface{}
        Err     error
        Codec   *ProtocolCodec
+       Payload int
 }
 
 func (p DubboPackage) String() string {
diff --git a/remoting/exchange.go b/remoting/exchange.go
index ad136a7..8d59ff0 100644
--- a/remoting/exchange.go
+++ b/remoting/exchange.go
@@ -59,9 +59,10 @@ type Request struct {
        // serial ID (ignore)
        SerialID byte
        // Data
-       Data   interface{}
-       TwoWay bool
-       Event  bool
+       Data    interface{}
+       TwoWay  bool
+       Event   bool
+       Payload int
 }
 
 // NewRequest aims to create Request.
diff --git a/remoting/getty/getty_client.go b/remoting/getty/getty_client.go
index 7082773..145e1a9 100644
--- a/remoting/getty/getty_client.go
+++ b/remoting/getty/getty_client.go
@@ -119,6 +119,7 @@ type Options struct {
        ConnectTimeout time.Duration
        // request timeout
        RequestTimeout time.Duration
+       Payload        int
 }
 
 // Client : some configuration for network communication.
@@ -257,6 +258,7 @@ func (c *Client) selectSession(addr string) 
(*gettyRPCClient, getty.Session, err
 }
 
 func (c *Client) transfer(session getty.Session, request *remoting.Request, 
timeout time.Duration) (int, int, error) {
+       request.Payload = c.opts.Payload
        totalLen, sendLen, err := session.WritePkg(request, timeout)
        return totalLen, sendLen, perrors.WithStack(err)
 }

Reply via email to