This is an automated email from the ASF dual-hosted git repository. alexstocks pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/main by this push: new afb8a615e add: tag route static config (#2304) afb8a615e is described below commit afb8a615e06c5f48ada746f941c9611f02346440 Author: wudong5 <63356845+wudo...@users.noreply.github.com> AuthorDate: Tue May 30 20:24:43 2023 +0800 add: tag route static config (#2304) * add subscribe any value * fix nill error bug * fix bug that commentted by niu Signed-off-by: wudong <ustb...@163.com> * add static * use strings.Join * fix test error --------- Signed-off-by: wudong <ustb...@163.com> Co-authored-by: Ken Liu <ken.lj...@gmail.com> --- cluster/router/tag/router.go | 11 ++++--- cluster/router/tag/router_test.go | 38 ++++++++++++---------- config/application_config.go | 1 + config/config_loader.go | 1 + config_center/zookeeper/impl.go | 1 + imports/imports.go | 1 + registry/service_instance.go | 16 +++++++-- registry/zookeeper/service_discovery.go | 2 ++ .../curator_discovery/service_instance.go | 1 + 9 files changed, 47 insertions(+), 25 deletions(-) diff --git a/cluster/router/tag/router.go b/cluster/router/tag/router.go index d642a792a..7561cf1e0 100644 --- a/cluster/router/tag/router.go +++ b/cluster/router/tag/router.go @@ -52,7 +52,8 @@ func (p *PriorityRouter) Route(invokers []protocol.Invoker, url *common.URL, inv logger.Warnf("[tag router] invokers from previous router is empty") return invokers } - key := url.Service() + constant.TagRouterRuleSuffix + // tag is valid in application + key := strings.Join([]string{url.GetParam(constant.ApplicationKey, ""), constant.TagRouterRuleSuffix}, "") value, ok := p.routerConfigs.Load(key) if !ok { return staticTag(invokers, url, invocation) @@ -76,9 +77,9 @@ func (p *PriorityRouter) Notify(invokers []protocol.Invoker) { if len(invokers) == 0 { return } - service := invokers[0].GetURL().Service() - if service == "" { - logger.Error("url service is empty") + application := invokers[0].GetURL().GetParam(constant.ApplicationKey, "") + if application == "" { + logger.Error("url application is empty") return } dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration() @@ -86,7 +87,7 @@ func (p *PriorityRouter) Notify(invokers []protocol.Invoker) { logger.Warnf("config center does not start, please check if the configuration center has been properly configured in dubbogo.yml") return } - key := service + constant.TagRouterRuleSuffix + key := strings.Join([]string{application, constant.TagRouterRuleSuffix}, "") dynamicConfiguration.AddListener(key, p) value, err := dynamicConfiguration.GetRule(key) if err != nil { diff --git a/cluster/router/tag/router_test.go b/cluster/router/tag/router_test.go index 22ddba793..8a29503ac 100644 --- a/cluster/router/tag/router_test.go +++ b/cluster/router/tag/router_test.go @@ -129,8 +129,8 @@ func TestRouter(t *testing.T) { t.Run("dynamicEmptyTag_requestEmptyTag", func(t *testing.T) { p, err := NewTagPriorityRouter() assert.Nil(t, err) - p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{ - Key: consumerUrl.Service() + constant.TagRouterRuleSuffix, + p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{ + Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix, Force: false, Enabled: true, Valid: true, @@ -149,8 +149,8 @@ func TestRouter(t *testing.T) { t.Run("dynamicEmptyTag_requestHasTag", func(t *testing.T) { p, err := NewTagPriorityRouter() assert.Nil(t, err) - p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{ - Key: consumerUrl.Service() + constant.TagRouterRuleSuffix, + p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{ + Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix, Force: false, Enabled: true, Valid: true, @@ -170,8 +170,8 @@ func TestRouter(t *testing.T) { t.Run("dynamicTag_requestEmptyTag", func(t *testing.T) { p, err := NewTagPriorityRouter() assert.Nil(t, err) - p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{ - Key: consumerUrl.Service() + constant.TagRouterRuleSuffix, + p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{ + Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix, Force: false, Enabled: true, Valid: true, @@ -194,8 +194,8 @@ func TestRouter(t *testing.T) { t.Run("dynamicTag_emptyAddress_requestHasTag", func(t *testing.T) { p, err := NewTagPriorityRouter() assert.Nil(t, err) - p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{ - Key: consumerUrl.Service() + constant.TagRouterRuleSuffix, + p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{ + Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix, Force: false, Enabled: true, Valid: true, @@ -218,8 +218,8 @@ func TestRouter(t *testing.T) { t.Run("dynamicTag_address_requestHasTag", func(t *testing.T) { p, err := NewTagPriorityRouter() assert.Nil(t, err) - p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{ - Key: consumerUrl.Service() + constant.TagRouterRuleSuffix, + p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{ + Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix, Force: false, Enabled: true, Valid: true, @@ -243,8 +243,8 @@ func TestRouter(t *testing.T) { t.Run("dynamicTag_twoAddress_requestHasTag", func(t *testing.T) { p, err := NewTagPriorityRouter() assert.Nil(t, err) - p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{ - Key: consumerUrl.Service() + constant.TagRouterRuleSuffix, + p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{ + Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix, Force: false, Enabled: true, Valid: true, @@ -268,8 +268,8 @@ func TestRouter(t *testing.T) { t.Run("dynamicTag_addressNotMatch_requestHasTag", func(t *testing.T) { p, err := NewTagPriorityRouter() assert.Nil(t, err) - p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{ - Key: consumerUrl.Service() + constant.TagRouterRuleSuffix, + p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{ + Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix, Force: false, Enabled: true, Valid: true, @@ -293,8 +293,8 @@ func TestRouter(t *testing.T) { t.Run("dynamicTag_notValid", func(t *testing.T) { p, err := NewTagPriorityRouter() assert.Nil(t, err) - p.routerConfigs.Store(consumerUrl.Service()+constant.TagRouterRuleSuffix, config.RouterConfig{ - Key: consumerUrl.Service() + constant.TagRouterRuleSuffix, + p.routerConfigs.Store(consumerUrl.GetParam(constant.ApplicationKey, "")+constant.TagRouterRuleSuffix, config.RouterConfig{ + Key: consumerUrl.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix, Force: false, Enabled: true, Valid: false, @@ -338,6 +338,7 @@ func TestNotify(t *testing.T) { ivk := protocol.NewBaseInvoker(url1) ivk1 := protocol.NewBaseInvoker(url2) ivk2 := protocol.NewBaseInvoker(url3) + ivk.GetURL().SetParam(constant.ApplicationKey, "org.apache.dubbo.UserProvider.Test") invokerList := make([]protocol.Invoker, 0, 3) invokerList = append(invokerList, ivk) invokerList = append(invokerList, ivk1) @@ -359,7 +360,7 @@ tags: dc, _ := mockFactory.GetDynamicConfiguration(ccUrl) common_cfg.GetEnvInstance().SetDynamicConfiguration(dc) p.Notify(invokerList) - value, ok := p.routerConfigs.Load(url3.Service() + constant.TagRouterRuleSuffix) + value, ok := p.routerConfigs.Load(url1.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix) assert.True(t, ok) routerCfg := value.(config.RouterConfig) assert.True(t, routerCfg.Key == "org.apache.dubbo.UserProvider.Test") @@ -374,6 +375,7 @@ tags: ivk := protocol.NewBaseInvoker(url1) ivk1 := protocol.NewBaseInvoker(url2) ivk2 := protocol.NewBaseInvoker(url3) + ivk.GetURL().SetParam(constant.ApplicationKey, "org.apache.dubbo.UserProvider.Test") invokerList := make([]protocol.Invoker, 0, 3) invokerList = append(invokerList, ivk) invokerList = append(invokerList, ivk1) @@ -386,7 +388,7 @@ tags: dc, _ := mockFactory.GetDynamicConfiguration(ccUrl) common_cfg.GetEnvInstance().SetDynamicConfiguration(dc) p.Notify(invokerList) - value, ok := p.routerConfigs.Load(url3.Service() + constant.TagRouterRuleSuffix) + value, ok := p.routerConfigs.Load(url1.GetParam(constant.ApplicationKey, "") + constant.TagRouterRuleSuffix) assert.True(t, ok == false) assert.True(t, value == nil) }) diff --git a/config/application_config.go b/config/application_config.go index 13e00d5d1..24e6bc425 100644 --- a/config/application_config.go +++ b/config/application_config.go @@ -38,6 +38,7 @@ type ApplicationConfig struct { Environment string `yaml:"environment" json:"environment,omitempty" property:"environment"` // the metadata type. remote or local MetadataType string `default:"local" yaml:"metadata-type" json:"metadataType,omitempty" property:"metadataType"` + Tag string `yaml:"tag" json:"tag,omitempty" property:"tag"` } // Prefix dubbo.application diff --git a/config/config_loader.go b/config/config_loader.go index ff9dd544b..0b0d919e9 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -131,6 +131,7 @@ func createInstance(url *common.URL) (registry.ServiceInstance, error) { Enable: true, Healthy: true, Metadata: metadata, + Tag: appConfig.Tag, } for _, cus := range extension.GetCustomizers() { diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go index e22f7ad21..feafea489 100644 --- a/config_center/zookeeper/impl.go +++ b/config_center/zookeeper/impl.go @@ -103,6 +103,7 @@ func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfigu // AddListener add listener for key // TODO this method should has a parameter 'group', and it does not now, so we should concat group and key with '/' manually func (c *zookeeperDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, options ...config_center.Option) { + key = strings.Join([]string{c.GetURL().GetParam(constant.ConfigNamespaceKey, config_center.DefaultGroup), key}, "/") qualifiedKey := buildPath(c.rootPath, key) c.cacheListener.AddListener(qualifiedKey, listener) } diff --git a/imports/imports.go b/imports/imports.go index 3b5601ad8..277e74cee 100644 --- a/imports/imports.go +++ b/imports/imports.go @@ -34,6 +34,7 @@ import ( _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/roundrobin" _ "dubbo.apache.org/dubbo-go/v3/cluster/router/meshrouter" _ "dubbo.apache.org/dubbo-go/v3/cluster/router/polaris" + _ "dubbo.apache.org/dubbo-go/v3/cluster/router/tag" _ "dubbo.apache.org/dubbo-go/v3/config_center/nacos" _ "dubbo.apache.org/dubbo-go/v3/config_center/zookeeper" _ "dubbo.apache.org/dubbo-go/v3/filter/accesslog" diff --git a/registry/service_instance.go b/registry/service_instance.go index ca1a8ff12..7c3c57c4a 100644 --- a/registry/service_instance.go +++ b/registry/service_instance.go @@ -19,6 +19,7 @@ package registry import ( "encoding/json" + url2 "net/url" "strconv" ) @@ -70,6 +71,9 @@ type ServiceInstance interface { // SetServiceMetadata saves metadata in instance SetServiceMetadata(info *common.MetadataInfo) + + // GetTag will return the tag of the instance + GetTag() string } // nolint @@ -92,6 +96,7 @@ type DefaultServiceInstance struct { Address string GroupName string endpoints []*Endpoint `json:"-"` + Tag string } // GetID will return this instance's id. It should be unique. @@ -142,6 +147,10 @@ func (d *DefaultServiceInstance) SetServiceMetadata(m *common.MetadataInfo) { d.ServiceMetadata = m } +func (d *DefaultServiceInstance) GetTag() string { + return d.Tag +} + // ToURLs return a list of url. func (d *DefaultServiceInstance) ToURLs(service *common.ServiceInfo) []*common.URL { urls := make([]*common.URL, 0, 8) @@ -158,7 +167,8 @@ func (d *DefaultServiceInstance) ToURLs(service *common.ServiceInfo) []*common.U url := common.NewURLWithOptions(common.WithProtocol(service.Protocol), common.WithIp(d.Host), common.WithPort(strconv.Itoa(endpoint.Port)), common.WithPath(service.Name), common.WithInterface(service.Name), - common.WithMethods(service.GetMethods()), common.WithParams(service.GetParams())) + common.WithMethods(service.GetMethods()), common.WithParams(service.GetParams()), + common.WithParams(url2.Values{constant.Tagkey: {d.Tag}})) urls = append(urls, url) } } @@ -166,7 +176,8 @@ func (d *DefaultServiceInstance) ToURLs(service *common.ServiceInfo) []*common.U url := common.NewURLWithOptions(common.WithProtocol(service.Protocol), common.WithIp(d.Host), common.WithPort(strconv.Itoa(d.Port)), common.WithPath(service.Name), common.WithInterface(service.Name), - common.WithMethods(service.GetMethods()), common.WithParams(service.GetParams())) + common.WithMethods(service.GetMethods()), common.WithParams(service.GetParams()), + common.WithParams(url2.Values{constant.Tagkey: {d.Tag}})) urls = append(urls, url) } return urls @@ -198,6 +209,7 @@ func (d *DefaultServiceInstance) Copy(endpoint *Endpoint) ServiceInstance { Healthy: d.Healthy, Metadata: d.Metadata, ServiceMetadata: d.ServiceMetadata, + Tag: d.Tag, } dn.ID = d.GetAddress() return dn diff --git a/registry/zookeeper/service_discovery.go b/registry/zookeeper/service_discovery.go index faafacfa5..5a5d7be0c 100644 --- a/registry/zookeeper/service_discovery.go +++ b/registry/zookeeper/service_discovery.go @@ -299,6 +299,7 @@ func (zksd *zookeeperServiceDiscovery) toCuratorInstance(instance registry.Servi Port: instance.GetPort(), Payload: pl, RegistrationTimeUTC: 0, + Tag: instance.GetTag(), } return cuis } @@ -327,5 +328,6 @@ func toZookeeperInstance(cris *curator_discovery.ServiceInstance) registry.Servi Enable: true, Healthy: true, Metadata: md, + Tag: cris.Tag, } } diff --git a/remoting/zookeeper/curator_discovery/service_instance.go b/remoting/zookeeper/curator_discovery/service_instance.go index 667ccc015..059d00c3e 100644 --- a/remoting/zookeeper/curator_discovery/service_instance.go +++ b/remoting/zookeeper/curator_discovery/service_instance.go @@ -26,4 +26,5 @@ type ServiceInstance struct { Port int `json:"port,omitempty"` Payload interface{} `json:"payload,omitempty"` RegistrationTimeUTC int64 `json:"registrationTimeUTC,omitempty"` + Tag string `json:"tag,omitempty"` }