This is an automated email from the ASF dual-hosted git repository.
dmwangnima pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/main by this push:
new d1c41be7e Feature triple api tuning (#2525)
d1c41be7e is described below
commit d1c41be7e28a4bba2e59f634433fa71082db5f1a
Author: Scout Wang <[email protected]>
AuthorDate: Sat Dec 2 09:07:17 2023 +0800
Feature triple api tuning (#2525)
* feat: introduce connection concept
* fix CI
* fix lint
---
client/action.go | 199 ++--
client/client.go | 104 +-
client/options.go | 306 +++--
client/options_test.go | 1186 +++++++++++++++++++-
config/application_config.go | 1 +
dubbo.go | 16 +-
global/consumer_config.go | 12 +-
global/reference_config.go | 6 +-
protocol/dubbo/dubbo_codec.go | 12 +-
.../example/new/client}/main.go | 39 +-
.../example/new/server}/main.go | 38 +-
.../triple/health/triple_health/health.triple.go | 26 +-
.../client/cmd_instance_with_registry/main.go | 2 +-
.../proto/triple_gen/greettriple/greet.triple.go | 35 +-
protocol/triple/triple_invoker.go | 1 +
server/action.go | 30 +-
server/options.go | 6 +
server/server.go | 13 +-
18 files changed, 1614 insertions(+), 418 deletions(-)
diff --git a/client/action.go b/client/action.go
index 7cc67a08d..b5a0fafd6 100644
--- a/client/action.go
+++ b/client/action.go
@@ -39,6 +39,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/config"
+ "dubbo.apache.org/dubbo-go/v3/global"
"dubbo.apache.org/dubbo-go/v3/graceful_shutdown"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper"
@@ -54,7 +55,7 @@ func getEnv(key, fallback string) string {
func updateOrCreateMeshURL(opts *ReferenceOptions) {
ref := opts.Reference
- con := opts.cliOpts.Consumer
+ con := opts.Consumer
if ref.URL != "" {
logger.Infof("URL specified explicitly %v", ref.URL)
@@ -98,8 +99,7 @@ func (refOpts *ReferenceOptions) ReferWithServiceAndInfo(srv
common.RPCService,
func (refOpts *ReferenceOptions) refer(srv common.RPCService, info
*ClientInfo) {
ref := refOpts.Reference
- clientOpts := refOpts.cliOpts
- con := clientOpts.Consumer
+ con := refOpts.Consumer
var methods []string
if info != nil {
@@ -107,8 +107,10 @@ func (refOpts *ReferenceOptions) refer(srv
common.RPCService, info *ClientInfo)
methods = info.MethodNames
refOpts.id = info.InterfaceName
refOpts.info = info
- } else {
+ } else if srv != nil {
refOpts.id = common.GetReference(srv)
+ } else {
+ refOpts.id = ref.InterfaceName
}
// If adaptive service is enabled,
// the cluster and load balance should be overridden to "adaptivesvc"
and "p2c" respectively.
@@ -139,77 +141,23 @@ func (refOpts *ReferenceOptions) refer(srv
common.RPCService, info *ClientInfo)
updateOrCreateMeshURL(refOpts)
// retrieving urls from config, and appending the urls to refOpts.urls
- if err := refOpts.processURL(cfgURL); err != nil {
+ urls, err := processURL(ref, refOpts.registriesCompat, cfgURL)
+ if err != nil {
panic(err)
}
- // Get invokers according to refOpts.urls
- var (
- invoker protocol.Invoker
- regURL *common.URL
- )
- invokers := make([]protocol.Invoker, len(refOpts.urls))
- for i, u := range refOpts.urls {
- if u.Protocol == constant.ServiceRegistryProtocol {
- invoker =
extension.GetProtocol(constant.RegistryProtocol).Refer(u)
- } else {
- invoker = extension.GetProtocol(u.Protocol).Refer(u)
- }
-
- if ref.URL != "" {
- invoker = protocolwrapper.BuildInvokerChain(invoker,
constant.ReferenceFilterKey)
- }
-
- invokers[i] = invoker
- if u.Protocol == constant.RegistryProtocol {
- regURL = u
- }
- }
-
- // TODO(hxmhlt): decouple from directory, config should not depend on
directory module
- if len(invokers) == 1 {
- refOpts.invoker = invokers[0]
- if ref.URL != "" {
- hitClu := constant.ClusterKeyFailover
- if u := refOpts.invoker.GetURL(); u != nil {
- hitClu = u.GetParam(constant.ClusterKey,
constant.ClusterKeyZoneAware)
- }
- cluster, err := extension.GetCluster(hitClu)
- if err != nil {
- panic(err)
- } else {
- refOpts.invoker =
cluster.Join(static.NewDirectory(invokers))
- }
- }
- } else {
- var hitClu string
- if regURL != nil {
- // for multi-subscription scenario, use 'zone-aware'
policy by default
- hitClu = constant.ClusterKeyZoneAware
- } else {
- // not a registry url, must be direct invoke.
- hitClu = constant.ClusterKeyFailover
- if u := invokers[0].GetURL(); u != nil {
- hitClu = u.GetParam(constant.ClusterKey,
constant.ClusterKeyZoneAware)
- }
- }
- cluster, err := extension.GetCluster(hitClu)
- if err != nil {
- panic(err)
- } else {
- refOpts.invoker =
cluster.Join(static.NewDirectory(invokers))
- }
+ // build invoker according to urls
+ invoker, err := buildInvoker(urls, ref)
+ if err != nil {
+ panic(err)
}
+ refOpts.urls = urls
+ refOpts.invoker = invoker
// publish consumer's metadata
publishServiceDefinition(cfgURL)
// create proxy
- if info == nil {
- // todo(DMwangnima): think about a more ideal way
- if con == nil {
- panic("client must be configured with ConsumerConfig
when using config.Load")
- }
-
+ if info == nil && srv != nil {
if ref.Async {
var callback common.CallbackResponse
if asyncSrv, ok := srv.(common.AsyncCallbackService);
ok {
@@ -226,8 +174,8 @@ func (refOpts *ReferenceOptions) refer(srv
common.RPCService, info *ClientInfo)
graceful_shutdown.RegisterProtocol(ref.Protocol)
}
-func (refOpts *ReferenceOptions) processURL(cfgURL *common.URL) error {
- ref := refOpts.Reference
+func processURL(ref *global.ReferenceConfig, regsCompat
map[string]*config.RegistryConfig, cfgURL *common.URL) ([]*common.URL, error) {
+ var urls []*common.URL
if ref.URL != "" { // use user-specific urls
/*
Two types of URL are allowed for refOpts.URL:
@@ -244,11 +192,11 @@ func (refOpts *ReferenceOptions) processURL(cfgURL
*common.URL) error {
for _, urlStr := range urlStrings {
serviceURL, err := common.NewURL(urlStr,
common.WithProtocol(ref.Protocol))
if err != nil {
- return fmt.Errorf(fmt.Sprintf("url
configuration error, please check your configuration, user specified URL %v
refer error, error message is %v ", urlStr, err.Error()))
+ return nil, fmt.Errorf(fmt.Sprintf("url
configuration error, please check your configuration, user specified URL %v
refer error, error message is %v ", urlStr, err.Error()))
}
if serviceURL.Protocol == constant.RegistryProtocol {
// serviceURL in this branch is a registry protocol
serviceURL.SubURL = cfgURL
- refOpts.urls = append(refOpts.urls, serviceURL)
+ urls = append(urls, serviceURL)
} else { // serviceURL in this branch is the target
endpoint IP address
if serviceURL.Path == "" {
serviceURL.Path = "/" +
ref.InterfaceName
@@ -257,17 +205,93 @@ func (refOpts *ReferenceOptions) processURL(cfgURL
*common.URL) error {
// other stuff, e.g. IP, port, etc., are same
as serviceURL
newURL := serviceURL.MergeURL(cfgURL)
newURL.AddParam("peer", "true")
- refOpts.urls = append(refOpts.urls, newURL)
+ urls = append(urls, newURL)
}
}
} else { // use registry configs
- refOpts.urls = config.LoadRegistries(ref.RegistryIDs,
refOpts.registriesCompat, common.CONSUMER)
+ urls = config.LoadRegistries(ref.RegistryIDs, regsCompat,
common.CONSUMER)
// set url to regURLs
- for _, regURL := range refOpts.urls {
+ for _, regURL := range urls {
regURL.SubURL = cfgURL
}
}
- return nil
+ return urls, nil
+}
+
+func buildInvoker(urls []*common.URL, ref *global.ReferenceConfig)
(protocol.Invoker, error) {
+ var (
+ invoker protocol.Invoker
+ regURL *common.URL
+ )
+ invokers := make([]protocol.Invoker, len(urls))
+ for i, u := range urls {
+ if u.Protocol == constant.ServiceRegistryProtocol {
+ invoker =
extension.GetProtocol(constant.RegistryProtocol).Refer(u)
+ } else {
+ invoker = extension.GetProtocol(u.Protocol).Refer(u)
+ }
+
+ if ref.URL != "" {
+ invoker = protocolwrapper.BuildInvokerChain(invoker,
constant.ReferenceFilterKey)
+ }
+
+ if u.Protocol == constant.RegistryProtocol {
+ regURL = u
+ }
+ invokers[i] = invoker
+ }
+
+ var resInvoker protocol.Invoker
+ // TODO(hxmhlt): decouple from directory, config should not depend on
directory module
+ if len(invokers) == 1 {
+ resInvoker = invokers[0]
+ if ref.URL != "" {
+ hitClu := constant.ClusterKeyFailover
+ if u := resInvoker.GetURL(); u != nil {
+ hitClu = u.GetParam(constant.ClusterKey,
constant.ClusterKeyZoneAware)
+ }
+ cluster, err := extension.GetCluster(hitClu)
+ if err != nil {
+ return nil, err
+ }
+ resInvoker = cluster.Join(static.NewDirectory(invokers))
+ }
+ return resInvoker, nil
+ }
+
+ var hitClu string
+ if regURL != nil {
+ // for multi-subscription scenario, use 'zone-aware' policy by
default
+ hitClu = constant.ClusterKeyZoneAware
+ } else {
+ // not a registry url, must be direct invoke.
+ hitClu = constant.ClusterKeyFailover
+ if u := invokers[0].GetURL(); u != nil {
+ hitClu = u.GetParam(constant.ClusterKey,
constant.ClusterKeyZoneAware)
+ }
+ }
+ cluster, err := extension.GetCluster(hitClu)
+ if err != nil {
+ return nil, err
+ }
+ resInvoker = cluster.Join(static.NewDirectory(invokers))
+
+ return resInvoker, nil
+}
+
+func publishServiceDefinition(url *common.URL) {
+ localService, err :=
extension.GetLocalMetadataService(constant.DefaultKey)
+ if err != nil {
+ logger.Warnf("get local metadata service failed, please check
if you have imported _ \"dubbo.apache.org/dubbo-go/v3/metadata/service/local\"")
+ return
+ }
+ localService.PublishServiceDefinition(url)
+ if url.GetParam(constant.MetadataTypeKey, "") !=
constant.RemoteMetadataStorageType {
+ return
+ }
+ if remoteMetadataService, err := extension.GetRemoteMetadataService();
err == nil && remoteMetadataService != nil {
+ remoteMetadataService.PublishServiceDefinition(url)
+ }
}
func (refOpts *ReferenceOptions) CheckAvailable() bool {
@@ -288,10 +312,8 @@ func (refOpts *ReferenceOptions) Implement(v
common.RPCService) {
if refOpts.pxy != nil {
refOpts.pxy.Implement(v)
} else if refOpts.info != nil {
- refOpts.info.ClientInjectFunc(v, &Client{
- cliOpts: refOpts.cliOpts,
- info: refOpts.info,
- refOpts: map[string]*ReferenceOptions{},
+ refOpts.info.ConnectionInjectFunc(v, &Connection{
+ refOpts: refOpts,
})
}
}
@@ -309,8 +331,8 @@ func (refOpts *ReferenceOptions) GetProxy() *proxy.Proxy {
func (refOpts *ReferenceOptions) getURLMap() url.Values {
ref := refOpts.Reference
app := refOpts.applicationCompat
- metrics := refOpts.cliOpts.Metrics
- tracing := refOpts.cliOpts.Otel.TracingConfig
+ metrics := refOpts.Metrics
+ tracing := refOpts.Otel.TracingConfig
urlMap := url.Values{}
// first set user params
@@ -397,18 +419,3 @@ func (refOpts *ReferenceOptions) postProcessConfig(url
*common.URL) {
p.PostProcessReferenceConfig(url)
}
}
-
-func publishServiceDefinition(url *common.URL) {
- localService, err :=
extension.GetLocalMetadataService(constant.DefaultKey)
- if err != nil {
- logger.Warnf("get local metadata service failed, please check
if you have imported _ \"dubbo.apache.org/dubbo-go/v3/metadata/service/local\"")
- return
- }
- localService.PublishServiceDefinition(url)
- if url.GetParam(constant.MetadataTypeKey, "") !=
constant.RemoteMetadataStorageType {
- return
- }
- if remoteMetadataService, err := extension.GetRemoteMetadataService();
err == nil && remoteMetadataService != nil {
- remoteMetadataService.PublishServiceDefinition(url)
- }
-}
diff --git a/client/client.go b/client/client.go
index b905c64a1..4a9a72269 100644
--- a/client/client.go
+++ b/client/client.go
@@ -20,32 +20,24 @@ package client
import (
"context"
- "fmt"
)
import (
- "github.com/pkg/errors"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)
+// ConsumerConfig
type Client struct {
- info *ClientInfo
-
cliOpts *ClientOptions
- refOpts map[string]*ReferenceOptions
}
type ClientInfo struct {
- InterfaceName string
- MethodNames []string
- ClientInjectFunc func(dubboCliRaw interface{}, cli *Client)
- Meta map[string]interface{}
+ InterfaceName string
+ MethodNames []string
+ ConnectionInjectFunc func(dubboCliRaw interface{}, conn *Connection)
+ Meta map[string]interface{}
}
type ClientDefinition struct {
@@ -53,84 +45,95 @@ type ClientDefinition struct {
Info *ClientInfo
}
-func (cli *Client) call(ctx context.Context, paramsRawVals []interface{},
interfaceName, methodName, callType string, opts ...CallOption)
(protocol.Result, error) {
- // get a default CallOptions
- // apply CallOption
+// InterfaceName/group/version /ReferenceConfig
+type Connection struct {
+ refOpts *ReferenceOptions
+}
+
+func (conn *Connection) call(ctx context.Context, reqs []interface{}, resp
interface{}, methodName, callType string, opts ...CallOption) (protocol.Result,
error) {
options := newDefaultCallOptions()
for _, opt := range opts {
opt(options)
}
-
- inv, err := generateInvocation(methodName, paramsRawVals, callType,
options)
+ inv, err := generateInvocation(methodName, reqs, resp, callType,
options)
if err != nil {
return nil, err
}
-
- refOption := cli.refOpts[common.ServiceKey(interfaceName,
options.Group, options.Version)]
- if refOption == nil {
- return nil, fmt.Errorf("no service found for %s/%s:%s, please
check if the service has been registered", options.Group, interfaceName,
options.Version)
- }
-
- return refOption.invoker.Invoke(ctx, inv), nil
-
+ return conn.refOpts.invoker.Invoke(ctx, inv), nil
}
-func (cli *Client) CallUnary(ctx context.Context, req, resp interface{},
interfaceName, methodName string, opts ...CallOption) error {
- res, err := cli.call(ctx, []interface{}{req, resp}, interfaceName,
methodName, constant.CallUnary, opts...)
+func (conn *Connection) CallUnary(ctx context.Context, reqs []interface{},
resp interface{}, methodName string, opts ...CallOption) error {
+ res, err := conn.call(ctx, reqs, resp, methodName, constant.CallUnary,
opts...)
if err != nil {
return err
}
return res.Error()
}
-func (cli *Client) CallClientStream(ctx context.Context, interfaceName,
methodName string, opts ...CallOption) (interface{}, error) {
- res, err := cli.call(ctx, nil, interfaceName, methodName,
constant.CallClientStream, opts...)
+func (conn *Connection) CallClientStream(ctx context.Context, methodName
string, opts ...CallOption) (interface{}, error) {
+ res, err := conn.call(ctx, nil, nil, methodName,
constant.CallClientStream, opts...)
if err != nil {
return nil, err
}
return res.Result(), res.Error()
}
-func (cli *Client) CallServerStream(ctx context.Context, req interface{},
interfaceName, methodName string, opts ...CallOption) (interface{}, error) {
- res, err := cli.call(ctx, []interface{}{req}, interfaceName,
methodName, constant.CallServerStream, opts...)
+func (conn *Connection) CallServerStream(ctx context.Context, req interface{},
methodName string, opts ...CallOption) (interface{}, error) {
+ res, err := conn.call(ctx, []interface{}{req}, nil, methodName,
constant.CallServerStream, opts...)
if err != nil {
return nil, err
}
return res.Result(), res.Error()
}
-func (cli *Client) CallBidiStream(ctx context.Context, interfaceName,
methodName string, opts ...CallOption) (interface{}, error) {
- res, err := cli.call(ctx, nil, interfaceName, methodName,
constant.CallBidiStream, opts...)
+func (conn *Connection) CallBidiStream(ctx context.Context, methodName string,
opts ...CallOption) (interface{}, error) {
+ res, err := conn.call(ctx, nil, nil, methodName,
constant.CallBidiStream, opts...)
if err != nil {
return nil, err
}
return res.Result(), res.Error()
}
-func (cli *Client) Init(info *ClientInfo, opts ...ReferenceOption) (string,
string, error) {
- if info == nil {
- return "", "", errors.New("ClientInfo is nil")
- }
-
- newRefOptions := defaultReferenceOptions()
- err := newRefOptions.init(cli, opts...)
- if err != nil {
- return "", "", err
- }
+func (cli *Client) Dial(interfaceName string, opts ...ReferenceOption)
(*Connection, error) {
+ return cli.dial(interfaceName, nil, opts...)
+}
- ref := newRefOptions.Reference
- cli.refOpts[common.ServiceKey(info.InterfaceName, ref.Group,
ref.Version)] = newRefOptions
+func (cli *Client) DialWithInfo(interfaceName string, info *ClientInfo, opts
...ReferenceOption) (*Connection, error) {
+ return cli.dial(interfaceName, info, opts...)
+}
- newRefOptions.ReferWithInfo(info)
+func (cli *Client) dial(interfaceName string, info *ClientInfo, opts
...ReferenceOption) (*Connection, error) {
+ newRefOpts := defaultReferenceOptions()
+ finalOpts := []ReferenceOption{
+ setReference(cli.cliOpts.overallReference),
+ setApplicationCompat(cli.cliOpts.applicationCompat),
+ setRegistriesCompat(cli.cliOpts.registriesCompat),
+ setConsumer(cli.cliOpts.Consumer),
+ // this config must be set after Reference initialized
+ setInterfaceName(interfaceName),
+ }
+ finalOpts = append(finalOpts, opts...)
+ if err := newRefOpts.init(finalOpts...); err != nil {
+ return nil, err
+ }
+ newRefOpts.ReferWithInfo(info)
- return ref.Group, ref.Version, nil
+ return &Connection{refOpts: newRefOpts}, nil
}
-
-func generateInvocation(methodName string, paramsRawVals []interface{},
callType string, opts *CallOptions) (protocol.Invocation, error) {
+func generateInvocation(methodName string, reqs []interface{}, resp
interface{}, callType string, opts *CallOptions) (protocol.Invocation, error) {
+ var paramsRawVals []interface{}
+ for _, req := range reqs {
+ paramsRawVals = append(paramsRawVals, req)
+ }
+ if resp != nil {
+ paramsRawVals = append(paramsRawVals, resp)
+ }
inv := invocation_impl.NewRPCInvocationWithOptions(
invocation_impl.WithMethodName(methodName),
invocation_impl.WithAttachment(constant.TimeoutKey,
opts.RequestTimeout),
invocation_impl.WithAttachment(constant.RetriesKey,
opts.Retries),
+ invocation_impl.WithArguments(reqs),
+ invocation_impl.WithReply(resp),
invocation_impl.WithParameterRawValues(paramsRawVals),
)
inv.SetAttribute(constant.CallTypeKey, callType)
@@ -145,6 +148,5 @@ func NewClient(opts ...ClientOption) (*Client, error) {
}
return &Client{
cliOpts: newCliOpts,
- refOpts: make(map[string]*ReferenceOptions),
}, nil
}
diff --git a/client/options.go b/client/options.go
index 686075e6c..c301eeb6d 100644
--- a/client/options.go
+++ b/client/options.go
@@ -18,7 +18,6 @@
package client
import (
- "reflect"
"strconv"
"time"
)
@@ -31,7 +30,6 @@ import (
"dubbo.apache.org/dubbo-go/v3/common"
commonCfg "dubbo.apache.org/dubbo-go/v3/common/config"
"dubbo.apache.org/dubbo-go/v3/common/constant"
- "dubbo.apache.org/dubbo-go/v3/common/dubboutil"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/global"
"dubbo.apache.org/dubbo-go/v3/graceful_shutdown"
@@ -41,9 +39,10 @@ import (
)
type ReferenceOptions struct {
- Reference *global.ReferenceConfig
- cliOpts *ClientOptions
- Registries map[string]*global.RegistryConfig
+ Reference *global.ReferenceConfig
+ Consumer *global.ConsumerConfig
+ Metrics *global.MetricsConfig
+ Otel *global.OtelConfig
pxy *proxy.Proxy
id string
@@ -60,10 +59,12 @@ type ReferenceOptions struct {
func defaultReferenceOptions() *ReferenceOptions {
return &ReferenceOptions{
Reference: global.DefaultReferenceConfig(),
+ Metrics: global.DefaultMetricsConfig(),
+ Otel: global.DefaultOtelConfig(),
}
}
-func (refOpts *ReferenceOptions) init(cli *Client, opts ...ReferenceOption)
error {
+func (refOpts *ReferenceOptions) init(opts ...ReferenceOption) error {
for _, opt := range opts {
opt(refOpts)
}
@@ -71,11 +72,19 @@ func (refOpts *ReferenceOptions) init(cli *Client, opts
...ReferenceOption) erro
return err
}
- refOpts.cliOpts = cli.cliOpts
- dubboutil.CopyFields(reflect.ValueOf(refOpts.cliOpts.Consumer).Elem(),
reflect.ValueOf(refOpts.Reference).Elem())
-
ref := refOpts.Reference
+ app := refOpts.applicationCompat
+ if app != nil {
+ refOpts.metaDataType = app.MetadataType
+ if ref.Group == "" {
+ ref.Group = app.Group
+ }
+ if ref.Version == "" {
+ ref.Version = app.Version
+ }
+ }
+
// init method
methods := ref.Methods
if length := len(methods); length > 0 {
@@ -88,55 +97,43 @@ func (refOpts *ReferenceOptions) init(cli *Client, opts
...ReferenceOption) erro
}
}
- // init application
- application := refOpts.cliOpts.Application
- if application != nil {
- refOpts.applicationCompat = compatApplicationConfig(application)
- if err := refOpts.applicationCompat.Init(); err != nil {
- return err
- }
- refOpts.metaDataType = refOpts.applicationCompat.MetadataType
- if ref.Group == "" {
- ref.Group = refOpts.applicationCompat.Group
- }
- if ref.Version == "" {
- ref.Version = refOpts.applicationCompat.Version
- }
- }
// init cluster
if ref.Cluster == "" {
ref.Cluster = "failover"
}
- // todo(DMwangnima): move to registry package
// init registries
- var emptyRegIDsFlag bool
- if ref.RegistryIDs == nil || len(ref.RegistryIDs) <= 0 {
- emptyRegIDsFlag = true
- }
+ if len(refOpts.registriesCompat) > 0 {
+ regs := refOpts.registriesCompat
+ if len(ref.RegistryIDs) <= 0 {
+ ref.RegistryIDs = make([]string, len(regs))
+ for key := range regs {
+ ref.RegistryIDs = append(ref.RegistryIDs, key)
+ }
+ }
+ ref.RegistryIDs = commonCfg.TranslateIds(ref.RegistryIDs)
- // set client level as default registry
- regs := refOpts.Registries
- if regs == nil {
- regs = cli.cliOpts.Registries
+ newRegs := make(map[string]*config.RegistryConfig)
+ for _, id := range ref.RegistryIDs {
+ if reg, ok := regs[id]; ok {
+ newRegs[id] = reg
+ }
+ }
+ refOpts.registriesCompat = newRegs
}
- if regs != nil {
- refOpts.registriesCompat =
make(map[string]*config.RegistryConfig)
- for key, reg := range regs {
- refOpts.registriesCompat[key] =
compatRegistryConfig(reg)
- if err := refOpts.registriesCompat[key].Init(); err !=
nil {
- return err
- }
- if emptyRegIDsFlag {
- ref.RegistryIDs = append(ref.RegistryIDs, key)
- }
+ // init protocol
+ if ref.Protocol == "" {
+ ref.Protocol = "tri"
+ if refOpts.Consumer != nil && refOpts.Consumer.Protocol != "" {
+ ref.Protocol = refOpts.Consumer.Protocol
}
}
- ref.RegistryIDs = commonCfg.TranslateIds(ref.RegistryIDs)
- // init graceful_shutdown
-
graceful_shutdown.Init(graceful_shutdown.SetShutdown_Config(refOpts.cliOpts.Shutdown))
+ // init serialization
+ if ref.Serialization == "" {
+ ref.Serialization = constant.ProtobufSerialization
+ }
return commonCfg.Verify(refOpts)
}
@@ -158,15 +155,13 @@ func WithURL(url string) ReferenceOption {
}
}
-// todo(DMwangnima): change Filter Option like Cluster and LoadBalance
func WithFilter(filter string) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Filter = filter
}
}
-// todo(DMwangnima): think about a more ideal configuration style
-func WithRegistryIDs(registryIDs []string) ReferenceOption {
+func WithRegistryIDs(registryIDs ...string) ReferenceOption {
return func(opts *ReferenceOptions) {
if len(registryIDs) > 0 {
opts.Reference.RegistryIDs = registryIDs
@@ -174,17 +169,6 @@ func WithRegistryIDs(registryIDs []string) ReferenceOption
{
}
}
-func WithRegistry(opts ...registry.Option) ReferenceOption {
- regOpts := registry.NewOptions(opts...)
-
- return func(refOpts *ReferenceOptions) {
- if refOpts.Registries == nil {
- refOpts.Registries =
make(map[string]*global.RegistryConfig)
- }
- refOpts.Registries[regOpts.ID] = regOpts.Registry
- }
-}
-
// ========== Cluster Strategy ==========
func WithClusterAvailable() ReferenceOption {
@@ -303,12 +287,18 @@ func WithVersion(version string) ReferenceOption {
}
}
-func WithJSON() ReferenceOption {
+func WithSerializationJSON() ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.Serialization = constant.JSONSerialization
}
}
+func WithSerialization(serialization string) ReferenceOption {
+ return func(opts *ReferenceOptions) {
+ opts.Reference.Serialization = serialization
+ }
+}
+
func WithProvidedBy(providedBy string) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Reference.ProvidedBy = providedBy
@@ -323,6 +313,9 @@ func WithAsync() ReferenceOption {
func WithParams(params map[string]string) ReferenceOption {
return func(opts *ReferenceOptions) {
+ if len(params) <= 0 {
+ return
+ }
opts.Reference.Params = params
}
}
@@ -333,9 +326,9 @@ func WithGeneric() ReferenceOption {
}
}
-func WithSticky(sticky bool) ReferenceOption {
+func WithSticky() ReferenceOption {
return func(opts *ReferenceOptions) {
- opts.Reference.Sticky = sticky
+ opts.Reference.Sticky = true
}
}
@@ -406,15 +399,33 @@ func WithParam(k, v string) ReferenceOption {
// ---------- For framework ----------
// These functions should not be invoked by users
-func SetRegistries(regs map[string]*global.RegistryConfig) ReferenceOption {
+func setReference(reference *global.ReferenceConfig) ReferenceOption {
return func(opts *ReferenceOptions) {
- opts.Registries = regs
+ opts.Reference = reference
}
}
-func SetReference(reference *global.ReferenceConfig) ReferenceOption {
+func setInterfaceName(interfaceName string) ReferenceOption {
return func(opts *ReferenceOptions) {
- opts.Reference = reference
+ opts.Reference.InterfaceName = interfaceName
+ }
+}
+
+func setApplicationCompat(app *config.ApplicationConfig) ReferenceOption {
+ return func(opts *ReferenceOptions) {
+ opts.applicationCompat = app
+ }
+}
+
+func setRegistriesCompat(regs map[string]*config.RegistryConfig)
ReferenceOption {
+ return func(opts *ReferenceOptions) {
+ opts.registriesCompat = regs
+ }
+}
+
+func setConsumer(consumer *global.ConsumerConfig) ReferenceOption {
+ return func(opts *ReferenceOptions) {
+ opts.Consumer = consumer
}
}
@@ -425,16 +436,21 @@ type ClientOptions struct {
Shutdown *global.ShutdownConfig
Metrics *global.MetricsConfig
Otel *global.OtelConfig
+
+ overallReference *global.ReferenceConfig
+ applicationCompat *config.ApplicationConfig
+ registriesCompat map[string]*config.RegistryConfig
}
func defaultClientOptions() *ClientOptions {
return &ClientOptions{
- Consumer: global.DefaultConsumerConfig(),
- Registries: make(map[string]*global.RegistryConfig),
- Application: global.DefaultApplicationConfig(),
- Shutdown: global.DefaultShutdownConfig(),
- Metrics: global.DefaultMetricsConfig(),
- Otel: global.DefaultOtelConfig(),
+ Consumer: global.DefaultConsumerConfig(),
+ Registries: make(map[string]*global.RegistryConfig),
+ Application: global.DefaultApplicationConfig(),
+ Shutdown: global.DefaultShutdownConfig(),
+ Metrics: global.DefaultMetricsConfig(),
+ Otel: global.DefaultOtelConfig(),
+ overallReference: global.DefaultReferenceConfig(),
}
}
@@ -445,6 +461,59 @@ func (cliOpts *ClientOptions) init(opts ...ClientOption)
error {
if err := defaults.Set(cliOpts); err != nil {
return err
}
+
+ con := cliOpts.Consumer
+
+ // init application
+ application := cliOpts.Application
+ if application != nil {
+ cliOpts.applicationCompat = compatApplicationConfig(application)
+ if err := cliOpts.applicationCompat.Init(); err != nil {
+ return err
+ }
+ }
+
+ // init registries
+ regs := cliOpts.Registries
+ if regs != nil {
+ cliOpts.registriesCompat =
make(map[string]*config.RegistryConfig)
+ if len(con.RegistryIDs) <= 0 {
+ con.RegistryIDs = make([]string, len(regs))
+ for key := range regs {
+ con.RegistryIDs = append(con.RegistryIDs, key)
+ }
+ }
+ con.RegistryIDs = commonCfg.TranslateIds(con.RegistryIDs)
+
+ for _, id := range con.RegistryIDs {
+ if reg, ok := regs[id]; ok {
+ cliOpts.registriesCompat[id] =
compatRegistryConfig(reg)
+ if err := cliOpts.registriesCompat[id].Init();
err != nil {
+ return err
+ }
+ }
+ }
+ }
+
+ // init cluster
+ if cliOpts.overallReference.Cluster == "" {
+ cliOpts.overallReference.Cluster = constant.ClusterKeyFailover
+ }
+
+ // init protocol
+ if cliOpts.Consumer.Protocol == "" {
+ cliOpts.Consumer.Protocol = "tri"
+ }
+
+ // init serialization
+ if cliOpts.overallReference.Serialization == "" {
+ cliOpts.overallReference.Serialization =
constant.ProtobufSerialization
+ }
+
+ // todo(DMwangnima): is there any part that we should do compatibility
processing?
+
+ // init graceful_shutdown
+
graceful_shutdown.Init(graceful_shutdown.SetShutdown_Config(cliOpts.Shutdown))
return nil
}
@@ -458,19 +527,20 @@ func WithClientCheck() ClientOption {
func WithClientURL(url string) ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.URL = url
+ opts.overallReference.URL = url
}
}
// todo(DMwangnima): change Filter Option like Cluster and LoadBalance
func WithClientFilter(filter string) ClientOption {
return func(opts *ClientOptions) {
+ // todo: move this to overallReference
opts.Consumer.Filter = filter
}
}
// todo(DMwangnima): think about a more ideal configuration style
-func WithClientRegistryIDs(registryIDs []string) ClientOption {
+func WithClientRegistryIDs(registryIDs ...string) ClientOption {
return func(opts *ClientOptions) {
if len(registryIDs) > 0 {
opts.Consumer.RegistryIDs = registryIDs
@@ -482,9 +552,6 @@ func WithClientRegistry(opts ...registry.Option)
ClientOption {
regOpts := registry.NewOptions(opts...)
return func(cliOpts *ClientOptions) {
- if cliOpts.Registries == nil {
- cliOpts.Registries =
make(map[string]*global.RegistryConfig)
- }
cliOpts.Registries[regOpts.ID] = regOpts.Registry
}
}
@@ -501,55 +568,61 @@ func WithClientShutdown(opts ...graceful_shutdown.Option)
ClientOption {
func WithClientClusterAvailable() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Cluster = constant.ClusterKeyAvailable
+ opts.overallReference.Cluster = constant.ClusterKeyAvailable
}
}
func WithClientClusterBroadcast() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Cluster = constant.ClusterKeyBroadcast
+ opts.overallReference.Cluster = constant.ClusterKeyBroadcast
}
}
func WithClientClusterFailBack() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Cluster = constant.ClusterKeyFailback
+ opts.overallReference.Cluster = constant.ClusterKeyFailback
}
}
func WithClientClusterFailFast() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Cluster = constant.ClusterKeyFailfast
+ opts.overallReference.Cluster = constant.ClusterKeyFailfast
}
}
func WithClientClusterFailOver() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Cluster = constant.ClusterKeyFailover
+ opts.overallReference.Cluster = constant.ClusterKeyFailover
}
}
func WithClientClusterFailSafe() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Cluster = constant.ClusterKeyFailsafe
+ opts.overallReference.Cluster = constant.ClusterKeyFailsafe
}
}
func WithClientClusterForking() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Cluster = constant.ClusterKeyForking
+ opts.overallReference.Cluster = constant.ClusterKeyForking
}
}
func WithClientClusterZoneAware() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Cluster = constant.ClusterKeyZoneAware
+ opts.overallReference.Cluster = constant.ClusterKeyZoneAware
}
}
func WithClientClusterAdaptiveService() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Cluster = constant.ClusterKeyAdaptiveService
+ opts.overallReference.Cluster =
constant.ClusterKeyAdaptiveService
+ }
+}
+
+func WithClientClusterStrategy(strategy string) ClientOption {
+ return func(opts *ClientOptions) {
+ opts.overallReference.Cluster = strategy
}
}
@@ -557,73 +630,75 @@ func WithClientClusterAdaptiveService() ClientOption {
func WithClientLoadBalanceConsistentHashing() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Loadbalance =
constant.LoadBalanceKeyConsistentHashing
+ opts.overallReference.Loadbalance =
constant.LoadBalanceKeyConsistentHashing
}
}
func WithClientLoadBalanceLeastActive() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Loadbalance = constant.LoadBalanceKeyLeastActive
+ opts.overallReference.Loadbalance =
constant.LoadBalanceKeyLeastActive
}
}
func WithClientLoadBalanceRandom() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Loadbalance = constant.LoadBalanceKeyRandom
+ opts.overallReference.Loadbalance =
constant.LoadBalanceKeyRandom
}
}
func WithClientLoadBalanceRoundRobin() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Loadbalance = constant.LoadBalanceKeyRoundRobin
+ opts.overallReference.Loadbalance =
constant.LoadBalanceKeyRoundRobin
}
}
func WithClientLoadBalanceP2C() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Loadbalance = constant.LoadBalanceKeyP2C
+ opts.overallReference.Loadbalance = constant.LoadBalanceKeyP2C
}
}
func WithClientLoadBalance(lb string) ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Loadbalance = lb
+ opts.overallReference.Loadbalance = lb
}
}
func WithClientRetries(retries int) ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Retries = strconv.Itoa(retries)
+ opts.overallReference.Retries = strconv.Itoa(retries)
}
}
+// is this needed?
func WithClientGroup(group string) ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Group = group
+ opts.overallReference.Group = group
}
}
+// is this needed?
func WithClientVersion(version string) ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Version = version
+ opts.overallReference.Version = version
}
}
func WithClientSerializationJSON() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Serialization = constant.JSONSerialization
+ opts.overallReference.Serialization = constant.JSONSerialization
}
}
func WithClientSerialization(ser string) ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Serialization = ser
+ opts.overallReference.Serialization = ser
}
}
func WithClientProvidedBy(providedBy string) ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.ProvidedBy = providedBy
+ opts.overallReference.ProvidedBy = providedBy
}
}
@@ -636,16 +711,19 @@ func WithClientProvidedBy(providedBy string) ClientOption
{
func WithClientParams(params map[string]string) ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Params = params
+ if len(params) <= 0 {
+ return
+ }
+ opts.overallReference.Params = params
}
}
func WithClientParam(k, v string) ClientOption {
return func(opts *ClientOptions) {
- if opts.Consumer.Params == nil {
- opts.Consumer.Params = make(map[string]string)
+ if opts.overallReference.Params == nil {
+ opts.overallReference.Params = make(map[string]string,
8)
}
- opts.Consumer.Params[k] = v
+ opts.overallReference.Params[k] = v
}
}
@@ -660,9 +738,9 @@ func WithClientParam(k, v string) ClientOption {
// }
//}
-func WithClientSticky(sticky bool) ClientOption {
+func WithClientSticky() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.Sticky = sticky
+ opts.overallReference.Sticky = true
}
}
@@ -700,13 +778,13 @@ func WithClientRequestTimeout(timeout time.Duration)
ClientOption {
func WithClientForceTag() ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.ForceTag = true
+ opts.overallReference.ForceTag = true
}
}
func WithClientMeshProviderPort(port int) ClientOption {
return func(opts *ClientOptions) {
- opts.Consumer.MeshProviderPort = port
+ opts.overallReference.MeshProviderPort = port
}
}
@@ -750,8 +828,6 @@ func SetClientOtel(otel *global.OtelConfig) ClientOption {
type CallOptions struct {
RequestTimeout string
Retries string
- Group string
- Version string
}
type CallOption func(*CallOptions)
@@ -773,15 +849,3 @@ func WithCallRetries(retries int) CallOption {
opts.Retries = strconv.Itoa(retries)
}
}
-
-func WithCallGroup(group string) CallOption {
- return func(opts *CallOptions) {
- opts.Group = group
- }
-}
-
-func WithCallVersion(version string) CallOption {
- return func(opts *CallOptions) {
- opts.Version = version
- }
-}
diff --git a/client/options_test.go b/client/options_test.go
index 0016b916d..c2edacc03 100644
--- a/client/options_test.go
+++ b/client/options_test.go
@@ -17,37 +17,1179 @@
package client
-//func TestWithURL(t *testing.T) {
-// tests := []struct {
-// opts []ClientOption
-// justify func(t *testing.T, opts *ClientOptions)
-// }{
+import (
+ "testing"
+ "time"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/registry"
+)
+
+type newClientCase struct {
+ desc string
+ opts []ClientOption
+ verify func(t *testing.T, cli *Client, err error)
+}
+
+func processNewClientCases(t *testing.T, cases []newClientCase) {
+ for _, c := range cases {
+ t.Run(c.desc, func(t *testing.T) {
+ cli, err := NewClient(c.opts...)
+ c.verify(t, cli, err)
+ })
+ }
+}
+
+// ---------- ClientOption Testing ----------
+
+// todo: verify
+func TestWithClientURL(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "normal address",
+ opts: []ClientOption{
+ WithClientURL("127.0.0.1:20000"),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "127.0.0.1:20000",
cli.cliOpts.overallReference.URL)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientCheck(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config check",
+ opts: []ClientOption{
+ WithClientCheck(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, true,
cli.cliOpts.Consumer.Check)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientFilter(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal filter",
+ opts: []ClientOption{
+ WithClientFilter("test_filter"),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "test_filter",
cli.cliOpts.Consumer.Filter)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientRegistryIDs(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal ids",
+ opts: []ClientOption{
+ WithClientRegistryIDs("zk", "nacos"),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, []string{"zk", "nacos"},
cli.cliOpts.Consumer.RegistryIDs)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientRegistry(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config registry without setting id explicitly",
+ opts: []ClientOption{
+ WithClientRegistry(
+ registry.WithZookeeper(),
+ registry.WithAddress("127.0.0.1:2181"),
+ ),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ reg, ok :=
cli.cliOpts.Registries[constant.ZookeeperKey]
+ assert.True(t, ok)
+ assert.Equal(t, "127.0.0.1:2181", reg.Address)
+ regCompat, ok :=
cli.cliOpts.registriesCompat[constant.ZookeeperKey]
+ assert.True(t, ok)
+ assert.Equal(t, "127.0.0.1:2181",
regCompat.Address)
+ },
+ },
+ {
+ desc: "config registry without setting id",
+ opts: []ClientOption{
+ WithClientRegistry(
+ registry.WithID("zk"),
+ registry.WithZookeeper(),
+ registry.WithAddress("127.0.0.1:2181"),
+ ),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ reg, ok := cli.cliOpts.Registries["zk"]
+ assert.True(t, ok)
+ assert.Equal(t, "127.0.0.1:2181", reg.Address)
+ regCompat, ok :=
cli.cliOpts.registriesCompat["zk"]
+ assert.True(t, ok)
+ assert.Equal(t, "127.0.0.1:2181",
regCompat.Address)
+ },
+ },
+ {
+ desc: "config multiple registries with setting
RegistryIds",
+ opts: []ClientOption{
+ WithClientRegistry(
+ registry.WithZookeeper(),
+ registry.WithAddress("127.0.0.1:2181"),
+ ),
+ WithClientRegistry(
+ registry.WithID("nacos_test"),
+ registry.WithNacos(),
+ registry.WithAddress("127.0.0.1:8848"),
+ ),
+ WithClientRegistryIDs("nacos_test"),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ zkReg, ok :=
cli.cliOpts.Registries[constant.ZookeeperKey]
+ assert.True(t, ok)
+ assert.Equal(t, "127.0.0.1:2181", zkReg.Address)
+ ncReg, ok :=
cli.cliOpts.Registries["nacos_test"]
+ assert.True(t, ok)
+ assert.Equal(t, "127.0.0.1:8848", ncReg.Address)
+
+ _, ok =
cli.cliOpts.registriesCompat[constant.ZookeeperKey]
+ assert.False(t, ok)
+ ncCompat, ok :=
cli.cliOpts.registriesCompat["nacos_test"]
+ assert.True(t, ok)
+ assert.Equal(t, "127.0.0.1:8848",
ncCompat.Address)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientShutdown(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config shutdown",
+ opts: []ClientOption{
+ WithClientShutdown(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ // we do not verify the internal fields of
Shutdown since graceful_shutdown module is in charge of it
+ assert.NotNil(t, cli.cliOpts.Shutdown)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientCluster(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "default Cluster strategy",
+ opts: []ClientOption{},
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailover,
cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ {
+ desc: "config Available Cluster strategy",
+ opts: []ClientOption{
+ WithClientClusterAvailable(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyAvailable,
cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ {
+ desc: "config Broadcast Cluster strategy",
+ opts: []ClientOption{
+ WithClientClusterBroadcast(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyBroadcast,
cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ {
+ desc: "config FailBack Cluster strategy",
+ opts: []ClientOption{
+ WithClientClusterFailBack(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailback,
cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ {
+ desc: "config FailFast Cluster strategy",
+ opts: []ClientOption{
+ WithClientClusterFailFast(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailfast,
cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ {
+ desc: "config FailOver Cluster strategy",
+ opts: []ClientOption{
+ WithClientClusterFailOver(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailover,
cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ {
+ desc: "config FailSafe Cluster strategy",
+ opts: []ClientOption{
+ WithClientClusterFailSafe(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailsafe,
cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ {
+ desc: "config Forking Cluster strategy",
+ opts: []ClientOption{
+ WithClientClusterForking(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyForking,
cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ {
+ desc: "config ZoneAware Cluster strategy",
+ opts: []ClientOption{
+ WithClientClusterZoneAware(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyZoneAware,
cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ {
+ desc: "config AdaptiveService Cluster strategy",
+ opts: []ClientOption{
+ WithClientClusterAdaptiveService(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t,
constant.ClusterKeyAdaptiveService, cli.cliOpts.overallReference.Cluster)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientLoadBalance(t *testing.T) {
+ cases := []newClientCase{
+ // todo(DMwangnima): think about default loadbalance strategy
+ //{
+ // desc: "default Cluster strategy",
+ // opts: []ClientOption{},
+ // verify: func(t *testing.T, cli *Client, err error) {
+ // assert.Nil(t, err)
+ // assert.Equal(t, constant.ClusterKeyFailover,
cli.cliOpts.overallReference.Cluster)
+ // },
+ //},
+ {
+ desc: "config ConsistentHashing LoadBalance strategy",
+ opts: []ClientOption{
+ WithClientLoadBalanceConsistentHashing(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t,
constant.LoadBalanceKeyConsistentHashing,
cli.cliOpts.overallReference.Loadbalance)
+ },
+ },
+ {
+ desc: "config LeastActive LoadBalance strategy",
+ opts: []ClientOption{
+ WithClientLoadBalanceLeastActive(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t,
constant.LoadBalanceKeyLeastActive, cli.cliOpts.overallReference.Loadbalance)
+ },
+ },
+ {
+ desc: "config Random LoadBalance strategy",
+ opts: []ClientOption{
+ WithClientLoadBalanceRandom(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.LoadBalanceKeyRandom,
cli.cliOpts.overallReference.Loadbalance)
+ },
+ },
+ {
+ desc: "config RoundRobin LoadBalance strategy",
+ opts: []ClientOption{
+ WithClientLoadBalanceRoundRobin(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t,
constant.LoadBalanceKeyRoundRobin, cli.cliOpts.overallReference.Loadbalance)
+ },
+ },
+ {
+ desc: "config P2C LoadBalance strategy",
+ opts: []ClientOption{
+ WithClientLoadBalanceP2C(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.LoadBalanceKeyP2C,
cli.cliOpts.overallReference.Loadbalance)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientRetries(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal retries",
+ opts: []ClientOption{
+ WithClientRetries(3),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "3",
cli.cliOpts.overallReference.Retries)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientGroup(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal group",
+ opts: []ClientOption{
+ WithClientGroup("test_group"),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "test_group",
cli.cliOpts.overallReference.Group)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientVersion(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal version",
+ opts: []ClientOption{
+ WithClientVersion("test_version"),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "test_version",
cli.cliOpts.overallReference.Version)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientSerialization(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "default Serialization",
+ opts: []ClientOption{},
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ProtobufSerialization,
cli.cliOpts.overallReference.Serialization)
+ },
+ },
+ {
+ desc: "config JSON Serialization",
+ opts: []ClientOption{
+ WithClientSerializationJSON(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.JSONSerialization,
cli.cliOpts.overallReference.Serialization)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientProvidedBy(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal ProvidedBy",
+ opts: []ClientOption{
+ WithClientProvidedBy("test_instance"),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "test_instance",
cli.cliOpts.overallReference.ProvidedBy)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientParams(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal params",
+ opts: []ClientOption{
+ WithClientParams(map[string]string{
+ "test_key": "test_val",
+ }),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, map[string]string{"test_key":
"test_val"}, cli.cliOpts.overallReference.Params)
+ },
+ },
+ {
+ desc: "config nil params",
+ opts: []ClientOption{
+ WithClientParams(nil),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Nil(t,
cli.cliOpts.overallReference.Params)
+ },
+ },
+ {
+ desc: "config nil params with type information",
+ opts: []ClientOption{
+ WithClientParams((map[string]string)(nil)),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Nil(t,
cli.cliOpts.overallReference.Params)
+ },
+ },
+ {
+ desc: "config params without key-val",
+ opts: []ClientOption{
+ WithClientParams(map[string]string{}),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Nil(t,
cli.cliOpts.overallReference.Params)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientParam(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal param",
+ opts: []ClientOption{
+ WithClientParam("test_key", "test_val"),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, map[string]string{"test_key":
"test_val"}, cli.cliOpts.overallReference.Params)
+ },
+ },
+ {
+ desc: "config normal param multiple times",
+ opts: []ClientOption{
+ WithClientParam("test_key", "test_val"),
+ WithClientParam("test_key1", "test_val1"),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, map[string]string{"test_key":
"test_val", "test_key1": "test_val1"}, cli.cliOpts.overallReference.Params)
+ },
+ },
+ {
+ desc: "config param with empty key",
+ opts: []ClientOption{
+ WithClientParam("", ""),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, map[string]string{"": ""},
cli.cliOpts.overallReference.Params)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientSticky(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config sticky",
+ opts: []ClientOption{
+ WithClientSticky(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.True(t,
cli.cliOpts.overallReference.Sticky)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientProtocol(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "default Protocol",
+ opts: []ClientOption{},
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "tri",
cli.cliOpts.Consumer.Protocol)
+ },
+ },
+ {
+ desc: "config Dubbo Protocol",
+ opts: []ClientOption{
+ WithClientProtocolDubbo(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.Dubbo,
cli.cliOpts.Consumer.Protocol)
+ },
+ },
+ {
+ desc: "config Triple Protocol",
+ opts: []ClientOption{
+ WithClientProtocolTriple(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "tri",
cli.cliOpts.Consumer.Protocol)
+ },
+ },
+ {
+ desc: "config JsonRPC Protocol",
+ opts: []ClientOption{
+ WithClientProtocolJsonRPC(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "jsonrpc",
cli.cliOpts.Consumer.Protocol)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientRequestTimeout(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal RequestTimeout",
+ opts: []ClientOption{
+ WithClientRequestTimeout(6 * time.Second),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "6s",
cli.cliOpts.Consumer.RequestTimeout)
+ },
+ },
+ // todo(DMwangnima): consider whether this default timeout is
ideal
+ {
+ desc: "default RequestTimeout",
+ opts: []ClientOption{},
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "3s",
cli.cliOpts.Consumer.RequestTimeout)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientForceTag(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config ForceTag",
+ opts: []ClientOption{
+ WithClientForceTag(),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.True(t,
cli.cliOpts.overallReference.ForceTag)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+func TestWithClientMeshProviderPort(t *testing.T) {
+ cases := []newClientCase{
+ {
+ desc: "config normal MeshProviderPort",
+ opts: []ClientOption{
+ WithClientMeshProviderPort(20001),
+ },
+ verify: func(t *testing.T, cli *Client, err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, 20001,
cli.cliOpts.overallReference.MeshProviderPort)
+ },
+ },
+ }
+ processNewClientCases(t, cases)
+}
+
+// ---------- ReferenceOption Testing ----------
+
+type referenceOptionsInitCase struct {
+ desc string
+ opts []ReferenceOption
+ verify func(t *testing.T, refOpts *ReferenceOptions, err error)
+}
+
+func processReferenceOptionsInitCases(t *testing.T, cases
[]referenceOptionsInitCase) {
+ for _, c := range cases {
+ t.Run(c.desc, func(t *testing.T) {
+ defRefOpts := defaultReferenceOptions()
+ err := defRefOpts.init(c.opts...)
+ c.verify(t, defRefOpts, err)
+ })
+ }
+}
+
+func TestWithCheck(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config Check",
+ opts: []ReferenceOption{
+ WithCheck(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.True(t, *refOpts.Reference.Check)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithURL(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal URL",
+ opts: []ReferenceOption{
+ WithURL("127.0.0.1:20000"),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "127.0.0.1:20000",
refOpts.Reference.URL)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithFilter(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal filter",
+ opts: []ReferenceOption{
+ WithFilter("test_filter"),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "test_filter",
refOpts.Reference.Filter)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithRegistryIDs(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal ids",
+ opts: []ReferenceOption{
+ WithRegistryIDs("zk", "nacos"),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, []string{"zk", "nacos"},
refOpts.Reference.RegistryIDs)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithCluster(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "default Cluster strategy",
+ opts: []ReferenceOption{},
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailover,
refOpts.Reference.Cluster)
+ },
+ },
+ {
+ desc: "config Available Cluster strategy",
+ opts: []ReferenceOption{
+ WithClusterAvailable(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyAvailable,
refOpts.Reference.Cluster)
+ },
+ },
+ {
+ desc: "config Broadcast Cluster strategy",
+ opts: []ReferenceOption{
+ WithClusterBroadcast(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyBroadcast,
refOpts.Reference.Cluster)
+ },
+ },
+ {
+ desc: "config FailBack Cluster strategy",
+ opts: []ReferenceOption{
+ WithClusterFailBack(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailback,
refOpts.Reference.Cluster)
+ },
+ },
+ {
+ desc: "config FailFast Cluster strategy",
+ opts: []ReferenceOption{
+ WithClusterFailFast(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailfast,
refOpts.Reference.Cluster)
+ },
+ },
+ {
+ desc: "config FailOver Cluster strategy",
+ opts: []ReferenceOption{
+ WithClusterFailOver(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailover,
refOpts.Reference.Cluster)
+ },
+ },
+ {
+ desc: "config FailSafe Cluster strategy",
+ opts: []ReferenceOption{
+ WithClusterFailSafe(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyFailsafe,
refOpts.Reference.Cluster)
+ },
+ },
+ {
+ desc: "config Forking Cluster strategy",
+ opts: []ReferenceOption{
+ WithClusterForking(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyForking,
refOpts.Reference.Cluster)
+ },
+ },
+ {
+ desc: "config ZoneAware Cluster strategy",
+ opts: []ReferenceOption{
+ WithClusterZoneAware(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ClusterKeyZoneAware,
refOpts.Reference.Cluster)
+ },
+ },
+ {
+ desc: "config AdaptiveService Cluster strategy",
+ opts: []ReferenceOption{
+ WithClusterAdaptiveService(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t,
constant.ClusterKeyAdaptiveService, refOpts.Reference.Cluster)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithLoadBalance(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ // todo(DMwangnima): think about default loadbalance strategy
+ //{
+ // desc: "default Cluster strategy",
+ // opts: []ClientOption{},
+ // verify: func(t *testing.T, cli *Client, err error) {
+ // assert.Nil(t, err)
+ // assert.Equal(t, constant.ClusterKeyFailover,
cli.cliOpts.overallReference.Cluster)
+ // },
+ //},
+ {
+ desc: "config ConsistentHashing LoadBalance strategy",
+ opts: []ReferenceOption{
+ WithLoadBalanceConsistentHashing(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t,
constant.LoadBalanceKeyConsistentHashing, refOpts.Reference.Loadbalance)
+ },
+ },
+ {
+ desc: "config LeastActive LoadBalance strategy",
+ opts: []ReferenceOption{
+ WithLoadBalanceLeastActive(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t,
constant.LoadBalanceKeyLeastActive, refOpts.Reference.Loadbalance)
+ },
+ },
+ {
+ desc: "config Random LoadBalance strategy",
+ opts: []ReferenceOption{
+ WithLoadBalanceRandom(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.LoadBalanceKeyRandom,
refOpts.Reference.Loadbalance)
+ },
+ },
+ {
+ desc: "config RoundRobin LoadBalance strategy",
+ opts: []ReferenceOption{
+ WithLoadBalanceRoundRobin(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t,
constant.LoadBalanceKeyRoundRobin, refOpts.Reference.Loadbalance)
+ },
+ },
+ {
+ desc: "config P2C LoadBalance strategy",
+ opts: []ReferenceOption{
+ WithLoadBalanceP2C(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.LoadBalanceKeyP2C,
refOpts.Reference.Loadbalance)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithRetries(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal retries",
+ opts: []ReferenceOption{
+ WithRetries(3),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "3", refOpts.Reference.Retries)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithGroup(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal group",
+ opts: []ReferenceOption{
+ WithGroup("test_group"),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "test_group",
refOpts.Reference.Group)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithVersion(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal version",
+ opts: []ReferenceOption{
+ WithVersion("test_version"),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "test_version",
refOpts.Reference.Version)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithSerialization(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "default Serialization",
+ opts: []ReferenceOption{},
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.ProtobufSerialization,
refOpts.Reference.Serialization)
+ },
+ },
+ {
+ desc: "config JSON Serialization",
+ opts: []ReferenceOption{
+ WithSerializationJSON(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.JSONSerialization,
refOpts.Reference.Serialization)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithProvidedBy(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal ProvidedBy",
+ opts: []ReferenceOption{
+ WithProvidedBy("test_instance"),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "test_instance",
refOpts.Reference.ProvidedBy)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithParams(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal params",
+ opts: []ReferenceOption{
+ WithParams(map[string]string{
+ "test_key": "test_val",
+ }),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, map[string]string{"test_key":
"test_val"}, refOpts.Reference.Params)
+ },
+ },
+ {
+ desc: "config nil params",
+ opts: []ReferenceOption{
+ WithParams(nil),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Nil(t, refOpts.Reference.Params)
+ },
+ },
+ {
+ desc: "config nil params with type information",
+ opts: []ReferenceOption{
+ WithParams((map[string]string)(nil)),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Nil(t, refOpts.Reference.Params)
+ },
+ },
+ {
+ desc: "config params without key-val",
+ opts: []ReferenceOption{
+ WithParams(map[string]string{}),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Nil(t, refOpts.Reference.Params)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+//func TestWithClientParam(t *testing.T) {
+// cases := []newClientCase{
// {
+// desc: "config normal param",
// opts: []ClientOption{
-// WithClientURL("127.0.0.1:20000"),
+// WithClientParam("test_key", "test_val"),
// },
-// justify: func(t *testing.T, opts *ClientOptions) {
-// urls := opts.urls
-// assert.Equal(t, 1, len(urls))
-// assert.Equal(t, "tri", urls[0].Protocol)
+// verify: func(t *testing.T, cli *Client, err error) {
+// assert.Nil(t, err)
+// assert.Equal(t, map[string]string{"test_key":
"test_val"}, cli.cliOpts.overallReference.Params)
// },
// },
// {
+// desc: "config normal param multiple times",
// opts: []ClientOption{
-// WithClientURL("tri://127.0.0.1:20000"),
+// WithClientParam("test_key", "test_val"),
+// WithClientParam("test_key1", "test_val1"),
// },
-// justify: func(t *testing.T, opts *ClientOptions) {
-// urls := opts.urls
-// assert.Equal(t, 1, len(urls))
-// assert.Equal(t, "tri", urls[0].Protocol)
+// verify: func(t *testing.T, cli *Client, err error) {
+// assert.Nil(t, err)
+// assert.Equal(t, map[string]string{"test_key":
"test_val", "test_key1": "test_val1"}, cli.cliOpts.overallReference.Params)
+// },
+// },
+// {
+// desc: "config param with empty key",
+// opts: []ClientOption{
+// WithClientParam("", ""),
+// },
+// verify: func(t *testing.T, cli *Client, err error) {
+// assert.Nil(t, err)
+// assert.Equal(t, map[string]string{"": ""},
cli.cliOpts.overallReference.Params)
// },
// },
// }
-//
-// for _, test := range tests {
-// newOpts := defaultClientOptions()
-// assert.Nil(t, newOpts.init(test.opts...))
-// assert.Nil(t, newOpts.processURL(&common.URL{}))
-// test.justify(t, newOpts)
-// }
+// processNewClientCases(t, cases)
//}
+
+func TestWithSticky(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config sticky",
+ opts: []ReferenceOption{
+ WithSticky(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.True(t, refOpts.Reference.Sticky)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithProtocol(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "default Protocol",
+ opts: []ReferenceOption{},
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "tri",
refOpts.Reference.Protocol)
+ },
+ },
+ {
+ desc: "config Dubbo Protocol",
+ opts: []ReferenceOption{
+ WithProtocolDubbo(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, constant.Dubbo,
refOpts.Reference.Protocol)
+ },
+ },
+ {
+ desc: "config Triple Protocol",
+ opts: []ReferenceOption{
+ WithProtocolTriple(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "tri",
refOpts.Reference.Protocol)
+ },
+ },
+ {
+ desc: "config JsonRPC Protocol",
+ opts: []ReferenceOption{
+ WithProtocolJsonRPC(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "jsonrpc",
refOpts.Reference.Protocol)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithRequestTimeout(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal RequestTimeout",
+ opts: []ReferenceOption{
+ WithRequestTimeout(6 * time.Second),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, "6s",
refOpts.Reference.RequestTimeout)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithForceTag(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config ForceTag",
+ opts: []ReferenceOption{
+ WithForceTag(),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.True(t, refOpts.Reference.ForceTag)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
+
+func TestWithMeshProviderPort(t *testing.T) {
+ cases := []referenceOptionsInitCase{
+ {
+ desc: "config normal MeshProviderPort",
+ opts: []ReferenceOption{
+ WithMeshProviderPort(20001),
+ },
+ verify: func(t *testing.T, refOpts *ReferenceOptions,
err error) {
+ assert.Nil(t, err)
+ assert.Equal(t, 20001,
refOpts.Reference.MeshProviderPort)
+ },
+ },
+ }
+ processReferenceOptionsInitCases(t, cases)
+}
diff --git a/config/application_config.go b/config/application_config.go
index 6bdf837e4..c9de915b4 100644
--- a/config/application_config.go
+++ b/config/application_config.go
@@ -19,6 +19,7 @@ package config
import (
"github.com/creasty/defaults"
+
"github.com/pkg/errors"
)
diff --git a/dubbo.go b/dubbo.go
index 2e6c2e198..b779b78f7 100644
--- a/dubbo.go
+++ b/dubbo.go
@@ -23,6 +23,7 @@ import (
import (
"github.com/dubbogo/gost/log/logger"
+
"github.com/pkg/errors"
)
@@ -85,7 +86,7 @@ func (ins *Instance) NewClient(opts ...client.ClientOption)
(*client.Client, err
cliOpts = append(cliOpts,
client.WithClientFilter(conCfg.Filter),
// todo(DMwangnima): deal with Protocol
- client.WithClientRegistryIDs(conCfg.RegistryIDs),
+ client.WithClientRegistryIDs(conCfg.RegistryIDs...),
// todo(DMwangnima): deal with TracingKey
client.SetClientConsumer(conCfg),
)
@@ -206,7 +207,7 @@ func (ins *Instance) loadProvider() error {
logger.Fatalf("Failed to start server, err: %v", err)
}
}()
- return err
+ return nil
}
// loadConsumer loads the service consumer.
@@ -218,13 +219,14 @@ func (ins *Instance) loadConsumer() error {
// refer services
conLock.RLock()
defer conLock.RUnlock()
- for _, definition := range consumerServices {
- if _, _, err = cli.Init(definition.Info); err != nil {
- return err
+ for intfName, definition := range consumerServices {
+ conn, dialErr := cli.DialWithInfo(intfName, definition.Info)
+ if dialErr != nil {
+ return dialErr
}
- definition.Info.ClientInjectFunc(definition.Svc, cli)
+ definition.Info.ConnectionInjectFunc(definition.Svc, conn)
}
- return err
+ return nil
}
// SetConsumerServiceWithInfo sets the consumer service with the client
information.
diff --git a/global/consumer_config.go b/global/consumer_config.go
index d59bc6381..ee7c21e02 100644
--- a/global/consumer_config.go
+++ b/global/consumer_config.go
@@ -18,7 +18,6 @@
package global
type ConsumerConfig struct {
- ReferenceConfig
Filter string `yaml:"filter" json:"filter,omitempty"
property:"filter"`
RegistryIDs []string `yaml:"registry-ids"
json:"registry-ids,omitempty" property:"registry-ids"`
Protocol string `yaml:"protocol" json:"protocol,omitempty"
property:"protocol"`
@@ -27,16 +26,17 @@ type ConsumerConfig struct {
Check bool `yaml:"check" json:"check,omitempty"
property:"check"`
AdaptiveService bool `default:"false" yaml:"adaptive-service"
json:"adaptive-service" property:"adaptive-service"`
// there is no need to configure References, it will be replaced by
instance.NewClient
- //References map[string]*client.ReferenceConfig
`yaml:"references" json:"references,omitempty" property:"references"`
- TracingKey string `yaml:"tracing-key"
json:"tracing-key" property:"tracing-key"`
- FilterConf interface{} `yaml:"filter-conf"
json:"filter-conf,omitempty" property:"filter-conf"`
- MaxWaitTimeForServiceDiscovery string `default:"3s"
yaml:"max-wait-time-for-service-discovery"
json:"max-wait-time-for-service-discovery,omitempty"
property:"max-wait-time-for-service-discovery"`
- MeshEnabled bool `yaml:"mesh-enabled"
json:"mesh-enabled,omitempty" property:"mesh-enabled"`
+ References map[string]*ReferenceConfig
`yaml:"references" json:"references,omitempty" property:"references"`
+ TracingKey string
`yaml:"tracing-key" json:"tracing-key" property:"tracing-key"`
+ FilterConf interface{}
`yaml:"filter-conf" json:"filter-conf,omitempty" property:"filter-conf"`
+ MaxWaitTimeForServiceDiscovery string
`default:"3s" yaml:"max-wait-time-for-service-discovery"
json:"max-wait-time-for-service-discovery,omitempty"
property:"max-wait-time-for-service-discovery"`
+ MeshEnabled bool
`yaml:"mesh-enabled" json:"mesh-enabled,omitempty" property:"mesh-enabled"`
}
func DefaultConsumerConfig() *ConsumerConfig {
return &ConsumerConfig{
RequestTimeout: "3s",
Check: true,
+ References: make(map[string]*ReferenceConfig),
}
}
diff --git a/global/reference_config.go b/global/reference_config.go
index 94070fd09..5acb6427f 100644
--- a/global/reference_config.go
+++ b/global/reference_config.go
@@ -50,9 +50,9 @@ type ReferenceConfig struct {
func DefaultReferenceConfig() *ReferenceConfig {
return &ReferenceConfig{
// use Triple protocol by default
- Protocol: "tri",
- Methods: make([]*MethodConfig, 0, 8),
- Params: make(map[string]string, 8),
+ //Protocol: "tri",
+ Methods: make([]*MethodConfig, 0, 8),
+ //Params: make(map[string]string, 8),
}
}
diff --git a/protocol/dubbo/dubbo_codec.go b/protocol/dubbo/dubbo_codec.go
index 92f7132bf..ec7cbfd80 100644
--- a/protocol/dubbo/dubbo_codec.go
+++ b/protocol/dubbo/dubbo_codec.go
@@ -19,7 +19,6 @@ package dubbo
import (
"bytes"
- "strconv"
"time"
)
@@ -70,11 +69,12 @@ func (c *DubboCodec) EncodeRequest(request
*remoting.Request) (*bytes.Buffer, er
svc.Version =
invocation.GetAttachmentWithDefaultValue(constant.VersionKey, "")
svc.Group = invocation.GetAttachmentWithDefaultValue(constant.GroupKey,
"")
svc.Method = invocation.MethodName()
- timeout, err :=
strconv.Atoi(invocation.GetAttachmentWithDefaultValue(constant.TimeoutKey,
strconv.Itoa(constant.DefaultRemotingTimeout)))
- if err != nil {
- // it will be wrapped in readwrite.Write .
- return nil, perrors.WithStack(err)
- }
+ //timeout, err :=
strconv.Atoi(invocation.GetAttachmentWithDefaultValue(constant.TimeoutKey,
strconv.Itoa(constant.DefaultRemotingTimeout)))
+ timeout := 300000
+ //if err != nil {
+ // // it will be wrapped in readwrite.Write .
+ // return nil, perrors.WithStack(err)
+ //}
svc.Timeout = time.Duration(timeout)
header := impl.DubboHeader{}
diff --git a/protocol/triple/internal/client/cmd_instance_with_registry/main.go
b/protocol/dubbo/example/new/client/main.go
similarity index 58%
copy from protocol/triple/internal/client/cmd_instance_with_registry/main.go
copy to protocol/dubbo/example/new/client/main.go
index b3e0b0de4..016d63575 100644
--- a/protocol/triple/internal/client/cmd_instance_with_registry/main.go
+++ b/protocol/dubbo/example/new/client/main.go
@@ -18,40 +18,35 @@
package main
import (
- "dubbo.apache.org/dubbo-go/v3"
+ "context"
+)
+
+import (
+ "github.com/dubbogo/gost/log/logger"
+)
+
+import (
"dubbo.apache.org/dubbo-go/v3/client"
_ "dubbo.apache.org/dubbo-go/v3/imports"
- "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/client/common"
-
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/triple_gen/greettriple"
- "dubbo.apache.org/dubbo-go/v3/registry"
)
func main() {
- // global conception
- // configure global configurations and common modules
- ins, err := dubbo.NewInstance(
- dubbo.WithName("dubbo_test"),
- dubbo.WithRegistry(
- registry.WithID("zk"),
- registry.WithZookeeper(),
- registry.WithAddress("127.0.0.1:2181"),
- ),
+ cli, err := client.NewClient(
+ client.WithClientProtocolDubbo(),
)
if err != nil {
panic(err)
}
- // configure the params that only client layer cares
- cli, err := ins.NewClient(
- client.WithClientRegistryIDs([]string{"zk"}),
+ conn, err := cli.Dial("GreetProvider",
+ client.WithURL("127.0.0.1:20000"),
)
if err != nil {
panic(err)
}
-
- svc, err := greettriple.NewGreetService(cli)
- if err != nil {
- panic(err)
+ var resp string
+ if err := conn.CallUnary(context.Background(), []interface{}{"hello",
"new", "dubbo"}, &resp, "Greet"); err != nil {
+ logger.Errorf("GreetProvider.Greet err: %s", err)
+ return
}
-
- common.TestClient(svc)
+ logger.Infof("Get Response: %s", resp)
}
diff --git a/protocol/triple/internal/client/cmd_instance_with_registry/main.go
b/protocol/dubbo/example/new/server/main.go
similarity index 55%
copy from protocol/triple/internal/client/cmd_instance_with_registry/main.go
copy to protocol/dubbo/example/new/server/main.go
index b3e0b0de4..19620f0cc 100644
--- a/protocol/triple/internal/client/cmd_instance_with_registry/main.go
+++ b/protocol/dubbo/example/new/server/main.go
@@ -18,40 +18,32 @@
package main
import (
- "dubbo.apache.org/dubbo-go/v3"
- "dubbo.apache.org/dubbo-go/v3/client"
_ "dubbo.apache.org/dubbo-go/v3/imports"
- "dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/client/common"
-
"dubbo.apache.org/dubbo-go/v3/protocol/triple/internal/proto/triple_gen/greettriple"
- "dubbo.apache.org/dubbo-go/v3/registry"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/server"
)
+type GreetProvider struct {
+}
+
+func (*GreetProvider) Greet(req string, req1 string, req2 string) (string,
error) {
+ return req + req1 + req2, nil
+}
+
func main() {
- // global conception
- // configure global configurations and common modules
- ins, err := dubbo.NewInstance(
- dubbo.WithName("dubbo_test"),
- dubbo.WithRegistry(
- registry.WithID("zk"),
- registry.WithZookeeper(),
- registry.WithAddress("127.0.0.1:2181"),
+ srv, err := server.NewServer(
+ server.WithServerProtocol(
+ protocol.WithDubbo(),
+ protocol.WithPort(20000),
),
)
if err != nil {
panic(err)
}
- // configure the params that only client layer cares
- cli, err := ins.NewClient(
- client.WithClientRegistryIDs([]string{"zk"}),
- )
- if err != nil {
+ if err := srv.Register(&GreetProvider{}, nil,
server.WithInterface("GreetProvider")); err != nil {
panic(err)
}
-
- svc, err := greettriple.NewGreetService(cli)
- if err != nil {
+ if err := srv.Serve(); err != nil {
panic(err)
}
-
- common.TestClient(svc)
}
diff --git a/protocol/triple/health/triple_health/health.triple.go
b/protocol/triple/health/triple_health/health.triple.go
index 79de1641f..908737fcc 100644
--- a/protocol/triple/health/triple_health/health.triple.go
+++ b/protocol/triple/health/triple_health/health.triple.go
@@ -74,37 +74,31 @@ type Health interface {
// NewHealth constructs a client for the grpc.health.v1.Health service.
func NewHealth(cli *client.Client, opts ...client.ReferenceOption) (Health,
error) {
- group, version, err := cli.Init(&Health_ClientInfo, opts...)
+ conn, err := cli.Dial("grpc.health.v1.Health", opts...)
if err != nil {
return nil, err
}
return &HealthImpl{
- cli: cli,
- group: group,
- version: version,
+ conn: conn,
}, nil
}
// HealthImpl implements Health.
type HealthImpl struct {
- cli *client.Client
- group string
- version string
+ conn *client.Connection
}
func (c *HealthImpl) Check(ctx context.Context, req *HealthCheckRequest, opts
...client.CallOption) (*HealthCheckResponse, error) {
- opts = appendGroupVersion(opts, c)
resp := new(HealthCheckResponse)
- if err := c.cli.CallUnary(ctx, req, resp, "grpc.health.v1.Health",
"Check", opts...); err != nil {
+ if err := c.conn.CallUnary(ctx, []interface{}{req}, resp, "Check",
opts...); err != nil {
return nil, err
}
return resp, nil
}
func (c *HealthImpl) Watch(ctx context.Context, req *HealthCheckRequest, opts
...client.CallOption) (Health_WatchClient, error) {
- opts = appendGroupVersion(opts, c)
- stream, err := c.cli.CallServerStream(ctx, req,
"grpc.health.v1.Health", "Watch", opts...)
+ stream, err := c.conn.CallServerStream(ctx, req, "Watch", opts...)
if err != nil {
return nil, err
}
@@ -112,12 +106,6 @@ func (c *HealthImpl) Watch(ctx context.Context, req
*HealthCheckRequest, opts ..
return &HealthWatchClient{rawStream}, nil
}
-func appendGroupVersion(opts []client.CallOption, c *HealthImpl)
[]client.CallOption {
- opts = append(opts, client.WithCallGroup(c.group))
- opts = append(opts, client.WithCallVersion(c.version))
- return opts
-}
-
type Health_WatchClient interface {
Recv() bool
ResponseHeader() http.Header
@@ -152,9 +140,9 @@ func (cli *HealthWatchClient) Conn()
(triple_protocol.StreamingClientConn, error
var Health_ClientInfo = client.ClientInfo{
InterfaceName: "grpc.health.v1.Health",
MethodNames: []string{"Check", "Watch"},
- ClientInjectFunc: func(dubboCliRaw interface{}, cli *client.Client) {
+ ConnectionInjectFunc: func(dubboCliRaw interface{}, conn
*client.Connection) {
dubboCli := dubboCliRaw.(HealthImpl)
- dubboCli.cli = cli
+ dubboCli.conn = conn
},
}
diff --git a/protocol/triple/internal/client/cmd_instance_with_registry/main.go
b/protocol/triple/internal/client/cmd_instance_with_registry/main.go
index b3e0b0de4..7da6d0b96 100644
--- a/protocol/triple/internal/client/cmd_instance_with_registry/main.go
+++ b/protocol/triple/internal/client/cmd_instance_with_registry/main.go
@@ -42,7 +42,7 @@ func main() {
}
// configure the params that only client layer cares
cli, err := ins.NewClient(
- client.WithClientRegistryIDs([]string{"zk"}),
+ client.WithClientRegistryIDs("zk"),
)
if err != nil {
panic(err)
diff --git
a/protocol/triple/internal/proto/triple_gen/greettriple/greet.triple.go
b/protocol/triple/internal/proto/triple_gen/greettriple/greet.triple.go
index 45d3275ab..eeb15c23f 100644
--- a/protocol/triple/internal/proto/triple_gen/greettriple/greet.triple.go
+++ b/protocol/triple/internal/proto/triple_gen/greettriple/greet.triple.go
@@ -90,15 +90,12 @@ type GreetService interface {
// NewGreetService constructs a client for the greet.GreetService service.
func NewGreetService(cli *client.Client, opts ...client.ReferenceOption)
(GreetService, error) {
- group, version, err := cli.Init(&GreetService_ClientInfo, opts...)
+ conn, err := cli.DialWithInfo("greet.GreetService",
&GreetService_ClientInfo, opts...)
if err != nil {
return nil, err
}
-
return &GreetServiceImpl{
- cli: cli,
- group: group,
- version: version,
+ conn: conn,
}, nil
}
@@ -108,23 +105,19 @@ func SetConsumerService(srv common.RPCService) {
// GreetServiceImpl implements GreetService.
type GreetServiceImpl struct {
- cli *client.Client
- group string
- version string
+ conn *client.Connection
}
func (c *GreetServiceImpl) Greet(ctx context.Context, req *proto.GreetRequest,
opts ...client.CallOption) (*proto.GreetResponse, error) {
resp := new(proto.GreetResponse)
- opts = appendGroupVersion(opts, c)
- if err := c.cli.CallUnary(ctx, req, resp, "greet.GreetService",
"Greet", opts...); err != nil {
+ if err := c.conn.CallUnary(ctx, []interface{}{req}, resp, "Greet",
opts...); err != nil {
return nil, err
}
return resp, nil
}
func (c *GreetServiceImpl) GreetStream(ctx context.Context, opts
...client.CallOption) (GreetService_GreetStreamClient, error) {
- opts = appendGroupVersion(opts, c)
- stream, err := c.cli.CallBidiStream(ctx, "greet.GreetService",
"GreetStream", opts...)
+ stream, err := c.conn.CallBidiStream(ctx, "GreetStream", opts...)
if err != nil {
return nil, err
}
@@ -133,8 +126,7 @@ func (c *GreetServiceImpl) GreetStream(ctx context.Context,
opts ...client.CallO
}
func (c *GreetServiceImpl) GreetClientStream(ctx context.Context, opts
...client.CallOption) (GreetService_GreetClientStreamClient, error) {
- opts = appendGroupVersion(opts, c)
- stream, err := c.cli.CallClientStream(ctx, "greet.GreetService",
"GreetClientStream", opts...)
+ stream, err := c.conn.CallClientStream(ctx, "GreetClientStream",
opts...)
if err != nil {
return nil, err
}
@@ -143,8 +135,7 @@ func (c *GreetServiceImpl) GreetClientStream(ctx
context.Context, opts ...client
}
func (c *GreetServiceImpl) GreetServerStream(ctx context.Context, req
*proto.GreetServerStreamRequest, opts ...client.CallOption)
(GreetService_GreetServerStreamClient, error) {
- opts = appendGroupVersion(opts, c)
- stream, err := c.cli.CallServerStream(ctx, req, "greet.GreetService",
"GreetServerStream", opts...)
+ stream, err := c.conn.CallServerStream(ctx, req, "GreetServerStream",
opts...)
if err != nil {
return nil, err
}
@@ -152,12 +143,6 @@ func (c *GreetServiceImpl) GreetServerStream(ctx
context.Context, req *proto.Gre
return &GreetServiceGreetServerStreamClient{rawStream}, nil
}
-func appendGroupVersion(opts []client.CallOption, c *GreetServiceImpl)
[]client.CallOption {
- opts = append(opts, client.WithCallGroup(c.group))
- opts = append(opts, client.WithCallVersion(c.version))
- return opts
-}
-
type GreetService_GreetStreamClient interface {
Spec() triple_protocol.Spec
Peer() triple_protocol.Peer
@@ -250,9 +235,9 @@ func (cli *GreetServiceGreetServerStreamClient) Conn()
(triple_protocol.Streamin
var GreetService_ClientInfo = client.ClientInfo{
InterfaceName: "greet.GreetService",
MethodNames: []string{"Greet", "GreetStream", "GreetClientStream",
"GreetServerStream"},
- ClientInjectFunc: func(dubboCliRaw interface{}, cli *client.Client) {
- dubboCli := dubboCliRaw.(*GreetServiceImpl)
- dubboCli.cli = cli
+ ConnectionInjectFunc: func(dubboCliRaw interface{}, conn
*client.Connection) {
+ dubboCli := dubboCliRaw.(GreetServiceImpl)
+ dubboCli.conn = conn
},
}
diff --git a/protocol/triple/triple_invoker.go
b/protocol/triple/triple_invoker.go
index 86eeaa673..525fae69b 100644
--- a/protocol/triple/triple_invoker.go
+++ b/protocol/triple/triple_invoker.go
@@ -85,6 +85,7 @@ func (ti *TripleInvoker) Invoke(ctx context.Context,
invocation protocol.Invocat
// e.g. Client.CallUnary(... req, resp []interface, ...)
// inRaw represents req and resp, inRawLen represents 2.
inRaw := invocation.ParameterRawValues()
+ invocation.Reply()
inRawLen := len(inRaw)
method := invocation.MethodName()
// todo(DMwangnima): process headers(metadata) passed in
diff --git a/server/action.go b/server/action.go
index 46f41ac3a..f32b3dbe4 100644
--- a/server/action.go
+++ b/server/action.go
@@ -133,33 +133,35 @@ func (svcOpts *ServiceOptions) ExportWithInfo(info
*ServiceInfo) error {
}
func (svcOpts *ServiceOptions) export(info *ServiceInfo) error {
- srv := svcOpts.Service
+ svc := svcOpts.Service
if info != nil {
- srv.Interface = info.InterfaceName
+ if svc.Interface == "" {
+ svc.Interface = info.InterfaceName
+ }
svcOpts.Id = info.InterfaceName
svcOpts.info = info
}
// TODO: delay needExport
if svcOpts.unexported != nil && svcOpts.unexported.Load() {
- err := perrors.Errorf("The service %v has already unexported!",
srv.Interface)
+ err := perrors.Errorf("The service %v has already unexported!",
svc.Interface)
logger.Errorf(err.Error())
return err
}
if svcOpts.exported != nil && svcOpts.exported.Load() {
- logger.Warnf("The service %v has already exported!",
srv.Interface)
+ logger.Warnf("The service %v has already exported!",
svc.Interface)
return nil
}
regUrls := make([]*common.URL, 0)
- if !srv.NotRegister {
- regUrls = config.LoadRegistries(srv.RegistryIDs,
svcOpts.registriesCompat, common.PROVIDER)
+ if !svc.NotRegister {
+ regUrls = config.LoadRegistries(svc.RegistryIDs,
svcOpts.registriesCompat, common.PROVIDER)
}
urlMap := svcOpts.getUrlMap()
- protocolConfigs := loadProtocol(srv.ProtocolIDs,
svcOpts.protocolsCompat)
+ protocolConfigs := loadProtocol(svc.ProtocolIDs,
svcOpts.protocolsCompat)
if len(protocolConfigs) == 0 {
- logger.Warnf("The service %v'svcOpts '%v' protocols don't has
right protocolConfigs, Please check your configuration center and transfer
protocol ", srv.Interface, srv.ProtocolIDs)
+ logger.Warnf("The service %v'svcOpts '%v' protocols don't has
right protocolConfigs, Please check your configuration center and transfer
protocol ", svc.Interface, svc.ProtocolIDs)
return nil
}
@@ -173,10 +175,10 @@ func (svcOpts *ServiceOptions) export(info *ServiceInfo)
error {
// todo(DMwangnimg): finish replacing procedure
// registry the service reflect
- methods, err := common.ServiceMap.Register(srv.Interface,
proto.Name, srv.Group, srv.Version, svcOpts.rpcService)
+ methods, err := common.ServiceMap.Register(svc.Interface,
proto.Name, svc.Group, svc.Version, svcOpts.rpcService)
if err != nil {
formatErr := perrors.Errorf("The service %v needExport
the protocol %v error! Error message is %v.",
- srv.Interface, proto.Name, err.Error())
+ svc.Interface, proto.Name, err.Error())
logger.Errorf(formatErr.Error())
return formatErr
}
@@ -187,7 +189,7 @@ func (svcOpts *ServiceOptions) export(info *ServiceInfo)
error {
nextPort = nextPort.Next()
}
ivkURL := common.NewURLWithOptions(
- common.WithPath(srv.Interface),
+ common.WithPath(svc.Interface),
common.WithProtocol(proto.Name),
common.WithIp(proto.Ip),
common.WithPort(port),
@@ -197,14 +199,14 @@ func (svcOpts *ServiceOptions) export(info *ServiceInfo)
error {
common.WithMethods(strings.Split(methods, ",")),
// todo(DMwangnima): remove this
common.WithAttribute(constant.ServiceInfoKey, info),
- common.WithToken(srv.Token),
+ common.WithToken(svc.Token),
common.WithParamsValue(constant.MetadataTypeKey,
svcOpts.metadataType),
// fix https://github.com/apache/dubbo-go/issues/2176
common.WithParamsValue(constant.MaxServerSendMsgSize,
proto.MaxServerSendMsgSize),
common.WithParamsValue(constant.MaxServerRecvMsgSize,
proto.MaxServerRecvMsgSize),
)
- if len(srv.Tag) > 0 {
- ivkURL.AddParam(constant.Tagkey, srv.Tag)
+ if len(svc.Tag) > 0 {
+ ivkURL.AddParam(constant.Tagkey, svc.Tag)
}
// post process the URL to be exported
diff --git a/server/options.go b/server/options.go
index ca84f4c46..94b0db30b 100644
--- a/server/options.go
+++ b/server/options.go
@@ -581,6 +581,12 @@ type ServiceOption func(*ServiceOptions)
// ---------- For user ----------
+func WithInterface(intf string) ServiceOption {
+ return func(opts *ServiceOptions) {
+ opts.Service.Interface = intf
+ }
+}
+
// todo(DMwangnima): think about a more ideal configuration style
func WithRegistryIDs(registryIDs []string) ServiceOption {
return func(cfg *ServiceOptions) {
diff --git a/server/server.go b/server/server.go
index 2cd0150b0..a76d9cd59 100644
--- a/server/server.go
+++ b/server/server.go
@@ -25,6 +25,8 @@ import (
)
import (
+ "github.com/dubbogo/gost/log/logger"
+
"github.com/pkg/errors"
)
@@ -164,9 +166,16 @@ func (s *Server) Register(handler interface{}, info
*ServiceInfo, opts ...Servic
}
func (s *Server) exportServices() (err error) {
- s.svcOptsMap.Range(func(newSvcOpts, info interface{}) bool {
- err =
newSvcOpts.(*ServiceOptions).ExportWithInfo(info.(*ServiceInfo))
+ s.svcOptsMap.Range(func(svcOptsRaw, infoRaw interface{}) bool {
+ svcOpts := svcOptsRaw.(*ServiceOptions)
+ if infoRaw == nil {
+ err = svcOpts.ExportWithoutInfo()
+ } else {
+ info := infoRaw.(*ServiceInfo)
+ err = svcOpts.ExportWithInfo(info)
+ }
if err != nil {
+ logger.Errorf("export %s service failed, err: %s",
svcOpts.Service.Interface, err)
return false
}
return true