This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/main by this push:
new 4a18afeee update condition route config design v3.1 (#2700)
4a18afeee is described below
commit 4a18afeee52d1ace88afd2faae7d18362247845d
Author: YarBor <[email protected]>
AuthorDate: Fri Jul 19 13:39:37 2024 +0800
update condition route config design v3.1 (#2700)
---
cluster/router/affinity/factory.go | 53 +++
cluster/router/affinity/router.go | 223 ++++++++++
cluster/router/affinity/router_test.go | 231 ++++++++++
cluster/router/condition/dynamic_router.go | 118 ++++--
cluster/router/condition/route.go | 134 +++---
cluster/router/condition/router_test.go | 658 +++++++++++++++++++++--------
common/constant/key.go | 14 +-
config/router_config.go | 57 ++-
8 files changed, 1183 insertions(+), 305 deletions(-)
diff --git a/cluster/router/affinity/factory.go
b/cluster/router/affinity/factory.go
new file mode 100644
index 000000000..fb41497a6
--- /dev/null
+++ b/cluster/router/affinity/factory.go
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package affinity
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/cluster/router"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/extension"
+)
+
+func init() {
+ extension.SetRouterFactory(constant.AffinityServiceRouterFactoryKey,
NewServiceAffinityRouterFactory)
+ extension.SetRouterFactory(constant.AffinityAppRouterFactoryKey,
NewApplicationAffinityRouterFactory)
+}
+
+// ServiceAffinityRouterFactory router factory
+type ServiceAffinityRouterFactory struct{}
+
+// NewServiceAffinityRouterFactory constructs a new PriorityRouterFactory
+func NewServiceAffinityRouterFactory() router.PriorityRouterFactory {
+ return &ServiceAffinityRouterFactory{}
+}
+
+func (n ServiceAffinityRouterFactory) NewPriorityRouter()
(router.PriorityRouter, error) {
+ return newServiceAffinityRoute(), nil
+}
+
+// ApplicationAffinityRouterFactory router factory
+type ApplicationAffinityRouterFactory struct{}
+
+// NewApplicationAffinityRouterFactory constructs a new PriorityRouterFactory
+func NewApplicationAffinityRouterFactory() router.PriorityRouterFactory {
+ return &ApplicationAffinityRouterFactory{}
+}
+
+func (n ApplicationAffinityRouterFactory) NewPriorityRouter()
(router.PriorityRouter, error) {
+ return newApplicationAffinityRouter(), nil
+}
diff --git a/cluster/router/affinity/router.go
b/cluster/router/affinity/router.go
new file mode 100644
index 000000000..1ced13dcd
--- /dev/null
+++ b/cluster/router/affinity/router.go
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package affinity
+
+import (
+ "math"
+ "strings"
+ "sync"
+)
+
+import (
+ "github.com/dubbogo/gost/log/logger"
+
+ "gopkg.in/yaml.v2"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/cluster/router/condition"
+ "dubbo.apache.org/dubbo-go/v3/common"
+ conf "dubbo.apache.org/dubbo-go/v3/common/config"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/config"
+ "dubbo.apache.org/dubbo-go/v3/config_center"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/remoting"
+)
+
+type ServiceAffinityRoute struct {
+ affinityRoute
+}
+
+func newServiceAffinityRoute() *ServiceAffinityRoute {
+ return &ServiceAffinityRoute{}
+}
+
+func (s *ServiceAffinityRoute) Notify(invokers []protocol.Invoker) {
+ if len(invokers) == 0 {
+ return
+ }
+
+ url := invokers[0].GetURL()
+ if url == nil {
+ logger.Error("Failed to notify a Service Affinity rule, because
url is empty")
+ return
+ }
+
+ dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration()
+ if dynamicConfiguration == nil {
+ logger.Infof("Config center does not start, Condition router
will not be enabled")
+ return
+ }
+
+ key := strings.Join([]string{url.ColonSeparatedKey(),
constant.AffinityRuleSuffix}, "")
+ dynamicConfiguration.AddListener(key, s)
+ value, err := dynamicConfiguration.GetRule(key)
+ if err != nil {
+ logger.Errorf("Failed to query affinity rule, key=%s, err=%v",
key, err)
+ return
+ }
+
+ s.Process(&config_center.ConfigChangeEvent{Key: key, Value: value,
ConfigType: remoting.EventTypeAdd})
+}
+
+type ApplicationAffinityRoute struct {
+ affinityRoute
+ application string
+ currentApplication string
+}
+
+func newApplicationAffinityRouter() *ApplicationAffinityRoute {
+ applicationName := config.GetApplicationConfig().Name
+ a := &ApplicationAffinityRoute{
+ currentApplication: applicationName,
+ }
+
+ dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration()
+ if dynamicConfiguration != nil {
+
dynamicConfiguration.AddListener(strings.Join([]string{applicationName,
constant.AffinityRuleSuffix}, ""), a)
+ }
+ return a
+}
+
+func (s *ApplicationAffinityRoute) Notify(invokers []protocol.Invoker) {
+ if len(invokers) == 0 {
+ return
+ }
+ url := invokers[0].GetURL()
+ if url == nil {
+ logger.Error("Failed to notify a dynamically condition rule,
because url is empty")
+ return
+ }
+
+ dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration()
+ if dynamicConfiguration == nil {
+ logger.Infof("Config center does not start, Condition router
will not be enabled")
+ return
+ }
+
+ providerApplication := url.GetParam("application", "")
+ if providerApplication == "" || providerApplication ==
s.currentApplication {
+ logger.Warn("condition router get providerApplication is empty,
will not subscribe to provider app rules.")
+ return
+ }
+
+ if providerApplication != s.application {
+ if s.application != "" {
+
dynamicConfiguration.RemoveListener(strings.Join([]string{s.application,
constant.AffinityRuleSuffix}, ""), s)
+ }
+ s.application = providerApplication
+
+ key := strings.Join([]string{providerApplication,
constant.AffinityRuleSuffix}, "")
+ dynamicConfiguration.AddListener(key, s)
+ value, err := dynamicConfiguration.GetRule(key)
+ if err != nil {
+ logger.Errorf("Failed to query condition rule, key=%s,
err=%v", key, err)
+ return
+ }
+
+ s.Process(&config_center.ConfigChangeEvent{Key: key, Value:
value, ConfigType: remoting.EventTypeUpdate})
+ }
+}
+
+type affinityRoute struct {
+ mu sync.RWMutex
+ matcher *condition.FieldMatcher
+ enabled bool
+ key string
+ ratio int32
+}
+
+func (a *affinityRoute) Process(event *config_center.ConfigChangeEvent) {
+ a.mu.Lock()
+ defer a.mu.Unlock()
+ a.matcher, a.enabled, a.key, a.ratio = nil, false, "", 0
+
+ switch event.ConfigType {
+ case remoting.EventTypeDel:
+ case remoting.EventTypeAdd, remoting.EventTypeUpdate:
+ cfg, err := parseConfig(event.Value.(string))
+ if err != nil {
+ logger.Errorf("Failed to parse affinity config, key=%s,
err=%v", a.key, err)
+ return
+ }
+
+ if cfg.AffinityAware.Ratio < 0 || cfg.AffinityAware.Ratio > 100
{
+ logger.Errorf("Failed to parse affinity config,
affinity.ratio=%d, expect 0-100", a.ratio)
+ return
+ }
+
+ key := strings.TrimSpace(cfg.AffinityAware.Key)
+ if !cfg.Enabled || key == "" {
+ return
+ }
+ rule := strings.Join([]string{key, key}, "=$")
+ f, err := condition.NewFieldMatcher(rule)
+ if err != nil {
+ logger.Errorf("Failed to parse affinity config, key=%s,
rule=%s ,err=%v", a.key, rule, err)
+ return
+ }
+
+ a.matcher, a.enabled, a.key, a.ratio = &f, true, key,
cfg.AffinityAware.Ratio
+ }
+}
+
+func (a *affinityRoute) Route(invokers []protocol.Invoker, url *common.URL,
invocation protocol.Invocation) []protocol.Invoker {
+ if len(invokers) == 0 {
+ return invokers
+ }
+
+ a.mu.RLock()
+ enabled, matcher, ratio := a.enabled, a.matcher, a.ratio
+ a.mu.RUnlock()
+
+ if !enabled {
+ return invokers
+ }
+
+ res := make([]protocol.Invoker, 0, len(invokers))
+ for _, invoker := range invokers {
+ if matcher.MatchInvoker(url, invoker, invocation) {
+ res = append(res, invoker)
+ }
+ }
+ if float32(len(res))/float32(len(invokers)) >=
float32(ratio)/float32(100) {
+ return res
+ }
+
+ return invokers
+}
+
+func (a *affinityRoute) URL() *common.URL {
+ return nil
+}
+
+func (a *affinityRoute) Priority() int64 {
+ // expect this router is the last one in the router chain
+ return math.MinInt64
+}
+
+func (a *affinityRoute) Notify(_ []protocol.Invoker) {
+ panic("this function should not be called")
+}
+
+func parseConfig(c string) (config.AffinityRouter, error) {
+ res := config.AffinityRouter{}
+ err := yaml.Unmarshal([]byte(c), &res)
+ return res, err
+}
diff --git a/cluster/router/affinity/router_test.go
b/cluster/router/affinity/router_test.go
new file mode 100644
index 000000000..ced11b6e7
--- /dev/null
+++ b/cluster/router/affinity/router_test.go
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package affinity
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/cluster/router/condition"
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/config_center"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
+ "dubbo.apache.org/dubbo-go/v3/remoting"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+var providerUrls = []string{
+ "dubbo://127.0.0.1/com.foo.BarService",
+ "dubbo://127.0.0.1/com.foo.BarService",
+ "dubbo://127.0.0.1/com.foo.BarService?env=normal",
+ "dubbo://127.0.0.1/com.foo.BarService?env=normal",
+ "dubbo://127.0.0.1/com.foo.BarService?env=normal",
+ "dubbo://127.0.0.1/com.foo.BarService?region=beijing",
+ "dubbo://127.0.0.1/com.foo.BarService?region=beijing",
+ "dubbo://127.0.0.1/com.foo.BarService?region=beijing",
+ "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray",
+ "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray",
+ "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray",
+ "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=gray",
+ "dubbo://127.0.0.1/com.foo.BarService?region=beijing&env=normal",
+ "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou",
+ "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou",
+ "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=gray",
+ "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=gray",
+ "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=normal",
+ "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=normal",
+ "dubbo://127.0.0.1/com.foo.BarService?region=hangzhou&env=normal",
+ "dubbo://dubbo.apache.org/com.foo.BarService",
+ "dubbo://dubbo.apache.org/com.foo.BarService",
+ "dubbo://dubbo.apache.org/com.foo.BarService?env=normal",
+ "dubbo://dubbo.apache.org/com.foo.BarService?env=normal",
+ "dubbo://dubbo.apache.org/com.foo.BarService?env=normal",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=gray",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=beijing&env=normal",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=gray",
+ "dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=gray",
+
"dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=normal",
+
"dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=normal",
+
"dubbo://dubbo.apache.org/com.foo.BarService?region=hangzhou&env=normal",
+}
+
+func buildInvokers() []protocol.Invoker {
+ res := make([]protocol.Invoker, 0, len(providerUrls))
+ for _, url := range providerUrls {
+ u, err := common.NewURL(url)
+ if err != nil {
+ panic(err)
+ }
+ res = append(res, protocol.NewBaseInvoker(u))
+ }
+ return res
+}
+
+func newUrl(s string) *common.URL {
+ res, err := common.NewURL(s)
+ if err != nil {
+ panic(err)
+ }
+ return res
+}
+
+func gen_matcher(key string) *condition.FieldMatcher {
+ res, err := condition.NewFieldMatcher(key)
+ if err != nil {
+ panic(err)
+ }
+ return &res
+}
+
+type InvokersFilters []condition.FieldMatcher
+
+func NewINVOKERS_FILTERS() InvokersFilters {
+ return []condition.FieldMatcher{}
+}
+
+func (INV InvokersFilters) add(rule string) InvokersFilters {
+ m := gen_matcher(rule)
+ return append(INV, *m)
+}
+
+func (INV InvokersFilters) filtrate(inv []protocol.Invoker, url *common.URL,
invocation protocol.Invocation) []protocol.Invoker {
+ for _, cond := range INV {
+ tmpInv := make([]protocol.Invoker, 0)
+ for _, invoker := range inv {
+ if cond.MatchInvoker(url, invoker, invocation) {
+ tmpInv = append(tmpInv, invoker)
+ }
+ }
+ inv = tmpInv
+ }
+ return inv
+}
+
+func Test_affinityRoute_Route(t *testing.T) {
+ type fields struct {
+ content string
+ }
+ type args struct {
+ invokers []protocol.Invoker
+ url *common.URL
+ invocation protocol.Invocation
+ }
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ invokers_filters InvokersFilters
+ expectLen int
+ }{
+ {
+ name: "test base affinity router",
+ fields: fields{`configVersion: v3.1
+scope: service # Or application
+key: service.apache.com
+enabled: true
+runtime: true
+affinityAware:
+ key: region
+ ratio: 20`},
+ args: args{
+ invokers: buildInvokers(),
+ url:
newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"),
+ invocation:
invocation.NewRPCInvocation("getComment", nil, nil),
+ },
+ invokers_filters:
NewINVOKERS_FILTERS().add("region=$region"),
+ }, {
+ name: "test bad ratio",
+ fields: fields{`configVersion: v3.1
+scope: service # Or application
+key: service.apache.com
+enabled: true
+runtime: true
+affinityAware:
+ key: region
+ ratio: 101`},
+ args: args{
+ invokers: buildInvokers(),
+ url:
newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"),
+ invocation:
invocation.NewRPCInvocation("getComment", nil, nil),
+ },
+ invokers_filters: NewINVOKERS_FILTERS(),
+ }, {
+ name: "test ratio false",
+ fields: fields{`configVersion: v3.1
+scope: service # Or application
+key: service.apache.com
+enabled: true
+runtime: true
+affinityAware:
+ key: region
+ ratio: 80`},
+ args: args{
+ invokers: buildInvokers(),
+ url:
newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"),
+ invocation:
invocation.NewRPCInvocation("getComment", nil, nil),
+ },
+ invokers_filters: NewINVOKERS_FILTERS(),
+ }, {
+ name: "test ignore affinity route",
+ fields: fields{`configVersion: v3.1
+scope: service # Or application
+key: service.apache.com
+enabled: true
+runtime: true
+affinityAware:
+ key: bad-key
+ ratio: 80`},
+ args: args{
+ invokers: buildInvokers(),
+ url:
newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"),
+ invocation:
invocation.NewRPCInvocation("getComment", nil, nil),
+ },
+ invokers_filters: NewINVOKERS_FILTERS(),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ a := &affinityRoute{}
+ a.Process(&config_center.ConfigChangeEvent{
+ Value: tt.fields.content,
+ ConfigType: remoting.EventTypeUpdate,
+ })
+ res := a.Route(tt.args.invokers, tt.args.url,
tt.args.invocation)
+ if tt.invokers_filters != nil {
+ // check expect filtrate path
+ ans :=
tt.invokers_filters.filtrate(tt.args.invokers, tt.args.url, tt.args.invocation)
+ if len(ans) <
int(float32(len(providerUrls))*(float32(a.ratio)/100.)) {
+ assert.Equalf(t, 0, len(ans),
"route(%v, %v, %v)", tt.args.invokers, tt.args.url, tt.args.invocation)
+ } else {
+ assert.Equalf(t, ans, res, "route(%v,
%v, %v)", tt.args.invokers, tt.args.url, tt.args.invocation)
+ }
+ } else {
+ ans :=
tt.invokers_filters.filtrate(tt.args.invokers, tt.args.url, tt.args.invocation)
+ assert.Equalf(t, tt.expectLen, len(ans),
"route(%v, %v, %v)", tt.args.invokers, tt.args.url, tt.args.invocation)
+ }
+ })
+ }
+
+}
diff --git a/cluster/router/condition/dynamic_router.go
b/cluster/router/condition/dynamic_router.go
index dd8feca04..7c8aef1ee 100644
--- a/cluster/router/condition/dynamic_router.go
+++ b/cluster/router/condition/dynamic_router.go
@@ -19,7 +19,6 @@ package condition
import (
"fmt"
- "sort"
"strconv"
"strings"
"sync"
@@ -58,20 +57,41 @@ func (p stateRouters) route(invokers []protocol.Invoker,
url *common.URL, invoca
return invokers
}
-type multiplyConditionRoute []*MultiDestRouter
+type multiplyConditionRoute struct {
+ trafficDisabled []*FieldMatcher
+ routes []*MultiDestRouter
+}
-func (m multiplyConditionRoute) route(invokers []protocol.Invoker, url
*common.URL, invocation protocol.Invocation) []protocol.Invoker {
- if len(invokers) == 0 || len(m) == 0 {
- return invokers
+func (m *multiplyConditionRoute) route(invokers []protocol.Invoker, url
*common.URL, invocation protocol.Invocation) []protocol.Invoker {
+ if len(m.trafficDisabled) != 0 {
+ for _, cond := range m.trafficDisabled {
+ if cond.MatchRequest(url, invocation) {
+ logger.Warnf("Request has been disabled %s by
Condition.trafficDisable.match=\"%s\"", url.String(), cond.rule)
+
invocation.SetAttachment(constant.TrafficDisableKey, struct{}{})
+ return []protocol.Invoker{}
+ }
+ }
}
- for _, router := range m {
- res, isMatchWhen := router.Route(invokers, url, invocation)
- if !isMatchWhen || (len(res) == 0 &&
invocation.GetAttachmentInterface(constant.TrafficDisableKey) == nil &&
!router.force) {
- continue
+
+ if len(invokers) != 0 && len(m.routes) != 0 {
+ for _, router := range m.routes {
+ isMatchWhen := false
+ invokers, isMatchWhen = router.Route(invokers, url,
invocation)
+ if !isMatchWhen {
+ continue
+ }
+ if len(invokers) == 0 {
+ routeChains, ok :=
invocation.Attributes()["condition-chain"].([]string)
+ if ok {
+ logger.Errorf("request[%s] route an
empty set in condition-route:: %s", url.String(), strings.Join(routeChains,
"-->"))
+ }
+ return []protocol.Invoker{}
+ }
}
- return res
+ delete(invocation.Attributes(), "condition-chain")
}
- return []protocol.Invoker{}
+
+ return invokers
}
type condRouter interface {
@@ -174,7 +194,7 @@ func generateCondition(rawConfig string) (condRouter, bool,
bool, error) {
}
}
-func generateMultiConditionRoute(rawConfig string) (multiplyConditionRoute,
bool, bool, error) {
+func generateMultiConditionRoute(rawConfig string) (*multiplyConditionRoute,
bool, bool, error) {
routerConfig, err := parseMultiConditionRoute(rawConfig)
if err != nil {
logger.Warnf("[condition router]Build a new condition route
config error, %s and we will use the original condition rule configuration.",
err.Error())
@@ -186,41 +206,42 @@ func generateMultiConditionRoute(rawConfig string)
(multiplyConditionRoute, bool
return nil, false, false, nil
}
+ // remove same condition
+ removeDuplicates(routerConfig.Conditions)
+
conditionRouters := make([]*MultiDestRouter, 0,
len(routerConfig.Conditions))
+ disableMultiConditions := make([]*FieldMatcher, 0)
for _, conditionRule := range routerConfig.Conditions {
- url, err := common.NewURL("condition://")
- if err != nil {
- return nil, false, false, err
+ // removeDuplicates will set nil
+ if conditionRule == nil {
+ continue
+ }
+ url, err1 := common.NewURL("condition://")
+ if err1 != nil {
+ return nil, false, false, err1
}
url.SetAttribute(constant.RuleKey, conditionRule)
- url.AddParam(constant.TrafficDisableKey,
strconv.FormatBool(conditionRule.Disable))
- url.AddParam(constant.ForceKey,
strconv.FormatBool(conditionRule.Force))
- if conditionRule.Priority < 0 {
- logger.Warnf("got
conditionRouteConfig.conditions.priority (%d < 0) is invalid, ignore priority
value, use defatult %d ", conditionRule.Priority, constant.DefaultRoutePriority)
- } else {
- url.AddParam(constant.PriorityKey,
strconv.FormatInt(int64(conditionRule.Priority), 10))
+
+ conditionRoute, err2 := NewConditionMultiDestRouter(url)
+ if err2 != nil {
+ return nil, false, false, err2
}
- if conditionRule.Ratio < 0 || conditionRule.Ratio > 100 {
- logger.Warnf("got conditionRouteConfig.conditions.ratio
(%d) is invalid, hope (0 - 100), ignore ratio value, use defatult %d ",
conditionRule.Ratio, constant.DefaultRouteRatio)
- } else {
- url.AddParam(constant.RatioKey,
strconv.FormatInt(int64(conditionRule.Ratio), 10))
+ // got invalid condition config, continue
+ if conditionRoute == nil {
+ continue
}
-
- conditionRoute, err := NewConditionMultiDestRouter(url)
- if err != nil {
- return nil, false, false, err
+ if conditionRoute.thenCondition != nil &&
len(conditionRoute.thenCondition) != 0 {
+ conditionRouters = append(conditionRouters,
conditionRoute)
+ } else {
+ disableMultiConditions = append(disableMultiConditions,
&conditionRoute.whenCondition)
}
- conditionRouters = append(conditionRouters, conditionRoute)
}
- sort.Slice(conditionRouters, func(i, j int) bool {
- if conditionRouters[i].trafficDisable {
- return true
- }
- return conditionRouters[i].priority >
conditionRouters[j].priority
- })
- return conditionRouters, force, enable, nil
+ return &multiplyConditionRoute{
+ trafficDisabled: disableMultiConditions,
+ routes: conditionRouters,
+ }, force, enable, nil
}
func generateConditionsRoute(rawConfig string) (stateRouters, bool, bool,
error) {
@@ -330,20 +351,20 @@ func (a *ApplicationRouter) Notify(invokers
[]protocol.Invoker) {
return
}
- providerApplicaton := url.GetParam("application", "")
- if providerApplicaton == "" || providerApplicaton ==
a.currentApplication {
+ providerApplication := url.GetParam("application", "")
+ if providerApplication == "" || providerApplication ==
a.currentApplication {
logger.Warn("condition router get providerApplication is empty,
will not subscribe to provider app rules.")
return
}
- if providerApplicaton != a.application {
+ if providerApplication != a.application {
if a.application != "" {
dynamicConfiguration.RemoveListener(strings.Join([]string{a.application,
constant.ConditionRouterRuleSuffix}, ""), a)
}
- key := strings.Join([]string{providerApplicaton,
constant.ConditionRouterRuleSuffix}, "")
+ key := strings.Join([]string{providerApplication,
constant.ConditionRouterRuleSuffix}, "")
dynamicConfiguration.AddListener(key, a)
- a.application = providerApplicaton
+ a.application = providerApplication
value, err := dynamicConfiguration.GetRule(key)
if err != nil {
logger.Errorf("Failed to query condition rule, key=%s,
err=%v", key, err)
@@ -352,3 +373,16 @@ func (a *ApplicationRouter) Notify(invokers
[]protocol.Invoker) {
a.Process(&config_center.ConfigChangeEvent{Key: key, Value:
value, ConfigType: remoting.EventTypeUpdate})
}
}
+
+func removeDuplicates(rules []*config.ConditionRule) {
+ for i := 0; i < len(rules); i++ {
+ if rules[i] == nil {
+ continue
+ }
+ for j := i + 1; j < len(rules); j++ {
+ if rules[j] != nil && rules[i].Equal(rules[j]) {
+ rules[j] = nil
+ }
+ }
+ }
+}
diff --git a/cluster/router/condition/route.go
b/cluster/router/condition/route.go
index 55e85bdfd..cbe2995fb 100644
--- a/cluster/router/condition/route.go
+++ b/cluster/router/condition/route.go
@@ -299,63 +299,84 @@ func parseConditionRoute(routeContent string)
(*config.RouterConfig, error) {
return routerConfig, nil
}
+type FieldMatcher struct {
+ rule string
+ match map[string]matcher.Matcher
+}
+
+func NewFieldMatcher(rule string) (FieldMatcher, error) {
+ m, err := parseRule(rule)
+ if err != nil {
+ return FieldMatcher{}, err
+ }
+ return FieldMatcher{rule: rule, match: m}, nil
+}
+
+func (m *FieldMatcher) MatchRequest(url *common.URL, invocation
protocol.Invocation) bool {
+ return doMatch(url, nil, invocation, m.match, true)
+}
+
+func (m *FieldMatcher) MatchInvoker(url *common.URL, ivk protocol.Invoker,
invocation protocol.Invocation) bool {
+ return doMatch(ivk.GetURL(), url, nil, m.match, false)
+}
+
// MultiDestRouter Multiply-Destination-Router
type MultiDestRouter struct {
- whenCondition map[string]matcher.Matcher
- trafficDisable bool
- thenCondition []condSet
- ratio int
- priority int
- force bool
+ whenCondition FieldMatcher
+ thenCondition []condSet
}
type condSet struct {
- cond map[string]matcher.Matcher
+ FieldMatcher
subSetWeight int
}
-func newCondSet(cond map[string]matcher.Matcher, subSetWeight int) *condSet {
+func newCondSet(rule string, subSetWeight int) (condSet, error) {
if subSetWeight <= 0 {
subSetWeight = constant.DefaultRouteConditionSubSetWeight
}
- return &condSet{cond: cond, subSetWeight: subSetWeight}
+ m, err := NewFieldMatcher(rule)
+ if err != nil {
+ return condSet{}, err
+ }
+ return condSet{FieldMatcher: m, subSetWeight: subSetWeight}, nil
+}
+
+type destination struct {
+ matchRule string
+ weight int
+ ivks []protocol.Invoker
}
type destSets struct {
- dest []struct {
- weight int
- ivks []protocol.Invoker
- }
- weightSum int
+ destinations []*destination
+ weightSum int
}
func newDestSets() *destSets {
return &destSets{
- dest: make([]struct {
- weight int
- ivks []protocol.Invoker
- }, 0),
- weightSum: 0,
+ destinations: []*destination{},
+ weightSum: 0,
}
}
-func (s *destSets) addDest(weight int, ivks []protocol.Invoker) {
- s.dest = append(s.dest, struct {
- weight int
- ivks []protocol.Invoker
- }{weight: weight, ivks: ivks})
+func (s *destSets) addDest(weight int, rule string, ivks []protocol.Invoker) {
+ s.destinations = append(s.destinations, &destination{weight: weight,
matchRule: rule, ivks: ivks})
s.weightSum += weight
}
-func (s *destSets) randDest() []protocol.Invoker {
- if len(s.dest) == 1 {
- return s.dest[0].ivks
+func (s *destSets) randDest() *destination {
+ if s.weightSum == 0 {
+ return nil
+ }
+ if len(s.destinations) == 1 {
+ return s.destinations[0]
}
sum := rand.Intn(s.weightSum)
- for _, d := range s.dest {
+ for _, d := range s.destinations {
sum -= d.weight
if sum <= 0 {
- return d.ivks
+ return d
}
}
return nil
@@ -366,15 +387,10 @@ func (m MultiDestRouter) Route(invokers
[]protocol.Invoker, url *common.URL, inv
return invokers, false
}
- if !doMatch(url, nil, invocation, m.whenCondition, true) {
+ if !m.whenCondition.MatchRequest(url, invocation) {
return invokers, false
}
- if m.trafficDisable {
- invocation.SetAttachment(constant.TrafficDisableKey, struct{}{})
- return []protocol.Invoker{}, true
- }
-
if len(m.thenCondition) == 0 {
logger.Warn("condition state router thenCondition is empty")
return []protocol.Invoker{}, true
@@ -384,23 +400,31 @@ func (m MultiDestRouter) Route(invokers
[]protocol.Invoker, url *common.URL, inv
for _, condition := range m.thenCondition {
res := make([]protocol.Invoker, 0)
for _, invoker := range invokers {
- if doMatch(invoker.GetURL(), url, nil, condition.cond,
false) {
+ if condition.MatchInvoker(url, invoker, invocation) {
res = append(res, invoker)
}
}
if len(res) != 0 {
- destinations.addDest(condition.subSetWeight, res)
+ destinations.addDest(condition.subSetWeight,
condition.rule, res)
}
}
+ // use to print log, if route empty
+ i, ok := invocation.Attributes()["condition-chain"].([]string)
+ if !ok {
+ i = []string{}
+ }
- if len(destinations.dest) != 0 {
- res := destinations.randDest()
- // check x% > m.ratio%
- if len(res)*100/len(invokers) > m.ratio {
- return res, true
- }
+ d := destinations.randDest()
+ if d != nil {
+ invocation.Attributes()["condition-chain"] = append(i,
"request="+m.whenCondition.rule+",invokers="+d.matchRule)
+ return d.ivks, true
}
+ thenRule := make([]string, 0, len(m.thenCondition))
+ for _, set := range m.thenCondition {
+ thenRule = append(thenRule, set.rule)
+ }
+ invocation.Attributes()["condition-chain"] = append(i,
"request="+m.whenCondition.rule+",invokers!="+strings.Join(thenRule, ","))
return []protocol.Invoker{}, true
}
@@ -415,35 +439,31 @@ func NewConditionMultiDestRouter(url *common.URL)
(*MultiDestRouter, error) {
if !ok {
return nil, errors.Errorf("Condition Router can't get the rule
key")
}
- condConf, ok := rawCondConf.(config.ConditionRule)
+ condConf, ok := rawCondConf.(*config.ConditionRule)
if !ok {
return nil, errors.Errorf("Condition Router get the rule key
invaild , got %T", rawCondConf)
}
+ // ensure config effective
+ if (condConf.To == nil || len(condConf.To) == 0) && condConf.From.Match
== "" {
+ return nil, nil
+ }
c := &MultiDestRouter{
- whenCondition: make(map[string]matcher.Matcher),
- thenCondition: make([]condSet, 0, len(condConf.To)),
- trafficDisable: url.GetParamBool(constant.TrafficDisableKey,
false),
- ratio: int(url.GetParamInt32(constant.RatioKey,
constant.DefaultRouteRatio)),
- priority: int(url.GetParamInt32(constant.PriorityKey,
constant.DefaultRoutePriority)),
- force: url.GetParamBool(constant.ForceKey, false),
+ thenCondition: make([]condSet, 0, len(condConf.To)),
}
- m, err := parseRule(condConf.From.Match)
+ var err error
+ c.whenCondition, err = NewFieldMatcher(condConf.From.Match)
if err != nil {
return nil, err
}
- for k, v := range m {
- // if key same, cover
- c.whenCondition[k] = v
- }
for _, ruleTo := range condConf.To {
- cond, err := parseRule(ruleTo.Match)
+ cs, err := newCondSet(ruleTo.Match, ruleTo.Weight)
if err != nil {
return nil, err
}
- c.thenCondition = append(c.thenCondition, *newCondSet(cond,
ruleTo.Weight))
+ c.thenCondition = append(c.thenCondition, cs)
}
return c, nil
diff --git a/cluster/router/condition/router_test.go
b/cluster/router/condition/router_test.go
index 04d67ae6f..ae1727415 100644
--- a/cluster/router/condition/router_test.go
+++ b/cluster/router/condition/router_test.go
@@ -18,6 +18,7 @@
package condition
import (
+ "sync"
"testing"
)
@@ -27,9 +28,10 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common"
- "dubbo.apache.org/dubbo-go/v3/common/config"
+ commonConfig "dubbo.apache.org/dubbo-go/v3/common/config"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
+ "dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/config_center/configurator"
"dubbo.apache.org/dubbo-go/v3/protocol"
@@ -750,7 +752,7 @@ conditions:
- 'method=sayHello => region=hangzhou'`,
}
dc, _ := mockFactory.GetDynamicConfiguration(ccURL)
- config.GetEnvInstance().SetDynamicConfiguration(dc)
+ commonConfig.GetEnvInstance().SetDynamicConfiguration(dc)
router := NewServiceRouter()
router.Notify(invokerList)
@@ -795,7 +797,7 @@ conditions:
- 'method=sayHello => region=hangzhou'`,
}
dc, _ := mockFactory.GetDynamicConfiguration(ccURL)
- config.GetEnvInstance().SetDynamicConfiguration(dc)
+ commonConfig.GetEnvInstance().SetDynamicConfiguration(dc)
router := NewApplicationRouter()
router.Notify(invokerList)
@@ -864,224 +866,514 @@ func buildInvokers() []protocol.Invoker {
return res
}
-func TestConditionRoutePriority(t *testing.T) {
- ivks := buildInvokers()
- ar := NewApplicationRouter()
- ar.Process(&config_center.ConfigChangeEvent{Key: "", Value:
`configVersion: v3.1
+func Test_parseMultiConditionRoute(t *testing.T) {
+ type args struct {
+ routeContent string
+ }
+ tests := []struct {
+ name string
+ args args
+ want *config.ConditionRouter
+ wantErr assert.ErrorAssertionFunc
+ }{
+ {name: "testParseConfig", args: args{`configVersion: v3.1
scope: service
+key: org.apache.dubbo.samples.CommentService
force: false
runtime: true
enabled: true
-key: shop
+
+#######
conditions:
- from:
- match:
+ match: tag=tag1 # disable traffic
+ - from:
+ match: tag=gray
to:
- - match: region=$region & version=v1
- - match: region=$region & version=v2
- weight: 200
- - match: region=$region & version=v3
- weight: 300
- force: false
- ratio: 20
- priority: 20
- - from:
- match:
- region=beijing & version=v1
+ - match: tag!=gray
+ weight: 100
+ - match: tag=gray
+ weight: 900
+ - from:
+ match: version=v1
to:
- - match: env=$env & region=beijing
- force: false
- priority: 100
-`, ConfigType: remoting.EventTypeUpdate})
- consumerUrl, err :=
common.NewURL("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing&version=v1")
- if err != nil {
- panic(err)
- }
- got := ar.Route(ivks, consumerUrl,
invocation.NewRPCInvocation("getComment", nil, nil))
- expLen := 0
- for _, ivk := range ivks {
- if ivk.GetURL().GetParam("region", "") == "beijing" && "gray"
== ivk.GetURL().GetParam("env", "") {
- expLen++
- }
+ - match: version=v1`}, want: &config.ConditionRouter{
+ Scope: "service",
+ Key: "org.apache.dubbo.samples.CommentService",
+ Force: false,
+ Runtime: true,
+ Enabled: true,
+ Conditions: []*config.ConditionRule{
+ {
+ From: config.ConditionRuleFrom{Match:
"tag=tag1"},
+ To: nil,
+ }, {
+ From: config.ConditionRuleFrom{
+ Match: `tag=gray`,
+ },
+ To: []config.ConditionRuleTo{{
+ Match: `tag!=gray`,
+ Weight: 100,
+ }, {
+ Match: `tag=gray`,
+ Weight: 900,
+ }},
+ }, {
+ From: config.ConditionRuleFrom{
+ Match: `version=v1`,
+ },
+ To: []config.ConditionRuleTo{{
+ Match: `version=v1`,
+ Weight: 0,
+ }},
+ }},
+ }},
}
- if len(ivks)*100/expLen <= 20 {
- expLen = 0
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, err :=
parseMultiConditionRoute(tt.args.routeContent)
+ assert.Nil(t, err)
+ assert.Equalf(t, tt.want, got,
"parseMultiConditionRoute(%v)", tt.args.routeContent)
+ })
}
- assert.Equal(t, expLen, len(got))
}
-func TestConditionRouteTrafficDisable(t *testing.T) {
- ivks := buildInvokers()
- ar := NewApplicationRouter()
- ar.Process(&config_center.ConfigChangeEvent{Key: "", Value:
`configVersion: v3.1
-scope: service
-force: true
-runtime: true
-enabled: true
-key: shop
-conditions:
- - from:
- match:
- to:
- - match: region=$region & version=v1
- - match: region=$region & version=v2
- weight: 200
- - match: region=$region & version=v3
- weight: 300
- force: false
- ratio: 20
- priority: 20
- - from:
- match:
- region=beijing & version=v1
- to:
- force: true
- ratio: 20
- priority: 100
-`, ConfigType: remoting.EventTypeUpdate})
- consumerUrl, err :=
common.NewURL("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing")
+func genMatcher(rule string) FieldMatcher {
+ cond, err := parseRule(rule)
if err != nil {
panic(err)
}
- got := ar.Route(ivks, consumerUrl, invocation.NewRPCInvocation("echo",
nil, nil))
- assert.Equal(t, 0, len(got))
+ m := FieldMatcher{
+ rule: rule,
+ match: cond,
+ }
+ return m
}
-func TestConditionRouteRegionPriority(t *testing.T) {
- ivks := buildInvokers()
- ar := NewApplicationRouter()
- ar.Process(&config_center.ConfigChangeEvent{Key: "", Value:
`configVersion: v3.1
-scope: service
-force: true
-runtime: true
-enabled: true
-key: shop
-conditions:
- - from:
- match:
- to:
- - match: region=$region & env=$env
-`, ConfigType: remoting.EventTypeUpdate})
- consumerUrl, err :=
common.NewURL("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing")
- if err != nil {
- panic(err)
- }
- got := ar.Route(ivks, consumerUrl,
invocation.NewRPCInvocation("getComment", nil, nil))
- expLen := 0
- for _, ivk := range ivks {
- if ivk.GetURL().GetRawParam("env") ==
consumerUrl.GetRawParam("env") &&
- ivk.GetURL().GetRawParam("region") ==
consumerUrl.GetRawParam("region") {
- expLen++
+type InvokersFilters []FieldMatcher
+
+func NewINVOKERS_FILTERS() InvokersFilters {
+ return []FieldMatcher{}
+}
+
+func (INV InvokersFilters) add(rule string) InvokersFilters {
+ m := genMatcher(rule)
+ return append(INV, m)
+}
+
+func (INV InvokersFilters) filtrate(inv []protocol.Invoker, url *common.URL,
invocation protocol.Invocation) []protocol.Invoker {
+ for _, cond := range INV {
+ tmpInv := make([]protocol.Invoker, 0)
+ for _, invoker := range inv {
+ if cond.MatchInvoker(url, invoker, invocation) {
+ tmpInv = append(tmpInv, invoker)
+ }
}
+ inv = tmpInv
}
- assert.Equal(t, expLen, len(got))
- consumerUrl, err =
common.NewURL("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=hangzhou")
+ return inv
+}
+
+func newUrl(url string) *common.URL {
+ res, err := common.NewURL(url)
if err != nil {
panic(err)
}
- got = ar.Route(ivks, consumerUrl,
invocation.NewRPCInvocation("getComment", nil, nil))
- expLen = 0
- for _, ivk := range ivks {
- if ivk.GetURL().GetRawParam("env") ==
consumerUrl.GetRawParam("env") &&
- ivk.GetURL().GetRawParam("region") ==
consumerUrl.GetRawParam("region") {
- expLen++
- }
+ return res
+}
+
+func Test_multiplyConditionRoute_route(t *testing.T) {
+ type args struct {
+ invokers []protocol.Invoker
+ url *common.URL
+ invocation protocol.Invocation
}
- assert.Equal(t, expLen, len(got))
- consumerUrl, err =
common.NewURL("consumer://127.0.0.1/com.foo.BarService?env=normal®ion=shanghai")
- if err != nil {
- panic(err)
+ d := DynamicRouter{
+ mu: sync.RWMutex{},
+ force: false,
+ enable: false,
+ conditionRouter: nil,
}
- got = ar.Route(ivks, consumerUrl,
invocation.NewRPCInvocation("getComment", nil, nil))
- expLen = 0
- for _, ivk := range ivks {
- if ivk.GetURL().GetRawParam("region") ==
consumerUrl.GetRawParam("region") &&
- ivk.GetURL().GetRawParam("env") ==
consumerUrl.GetRawParam("env") {
- expLen++
+ tests := []struct {
+ name string
+ content string
+ args args
+ invokers_filters InvokersFilters
+ expResLen int
+ multiDestination []struct {
+ invokers_filters InvokersFilters
+ weight float32
}
- }
- assert.Equal(t, expLen, len(got))
-}
-
-func TestConditionRouteRegionPriorityFail(t *testing.T) {
- ivks := buildInvokers()
- ar := NewApplicationRouter()
- ar.Process(&config_center.ConfigChangeEvent{Key: "", Value:
`configVersion: v3.1
-scope: service
-force: true
-runtime: true
-enabled: true
-key: shop
+ }{
+ {
+ name: "test base condition",
+ content: `configVersion: v3.1
+scope: service
+key: org.apache.dubbo.samples.CommentService
+force: false
+runtime: true
+enabled: true
conditions:
- from:
- match:
+ match: env=gray
to:
- - match: region=$region & env=$env
- ratio: 100
-`, ConfigType: remoting.EventTypeUpdate})
- consumerUrl, err :=
common.NewURL("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing")
- if err != nil {
- panic(err)
- }
- got := ar.Route(ivks, consumerUrl,
invocation.NewRPCInvocation("getComment", nil, nil))
- assert.Equal(t, 0, len(got))
-}
-
-func TestConditionRouteMatchFail(t *testing.T) {
- ivks := buildInvokers()
- ar := NewApplicationRouter()
- ar.Process(&config_center.ConfigChangeEvent{Key: "", Value:
`configVersion: v3.1
-scope: service
-force: false
-runtime: true
-enabled: true
-key: shop
+ - match: env!=gray
+ weight: 100
+`,
+ args: args{
+ invokers: buildInvokers(),
+ url:
newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"),
+ invocation: invocation.NewRPCInvocation("echo",
nil, nil),
+ },
+ invokers_filters:
NewINVOKERS_FILTERS().add("env!=gray"),
+ }, {
+ name: "test removeDuplicates condition",
+ content: `configVersion: v3.1
+scope: service
+key: org.apache.dubbo.samples.CommentService
+force: false
+runtime: true
+enabled: true
conditions:
- from:
- match:
+ match: env=gray
to:
- - match: region=$region & env=$env & err-tag=Err-tag
+ - match: env!=gray
+ weight: 100
- from:
- match:
- trafficDisable: true
+ match: env=gray
to:
- - match:
-`, ConfigType: remoting.EventTypeUpdate})
- consumerUrl, err :=
common.NewURL("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing")
- if err != nil {
- panic(err)
- }
- got := ar.Route(ivks, consumerUrl,
invocation.NewRPCInvocation("errMethod", nil, nil))
- assert.Equal(t, 0, len(got))
-}
-
-func TestConditionRouteBanSpecialTraffic(t *testing.T) {
- ivks := buildInvokers()
- ar := NewApplicationRouter()
- ar.Process(&config_center.ConfigChangeEvent{Key: "", Value:
`configVersion: v3.1
-scope: service
-force: true
-runtime: true
-enabled: true
-key: shop
+ - match: env!=gray
+ weight: 100
+`,
+ args: args{
+ invokers: buildInvokers(),
+ url:
newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"),
+ invocation: invocation.NewRPCInvocation("echo",
nil, nil),
+ },
+ invokers_filters:
NewINVOKERS_FILTERS().add("env!=gray"),
+ }, {
+ name: "test consequent condition",
+ content: `configVersion: v3.1
+scope: service
+key: org.apache.dubbo.samples.CommentService
+force: false
+runtime: true
+enabled: true
conditions:
- from:
match: env=gray
to:
- - match:
- force: true
- priority: 100
+ - match: env!=gray
+ weight: 100
- from:
- match:
+ match: region=beijing
to:
- - match:
- force: true
- priority: 100
-`, ConfigType: remoting.EventTypeUpdate})
- consumerUrl, err :=
common.NewURL("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing")
- if err != nil {
- panic(err)
+ - match: region=beijing
+ weight: 100
+ - from:
+ to:
+ - match: host!=127.0.0.1
+`,
+ args: args{
+ invokers: buildInvokers(),
+ url:
newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"),
+ invocation: invocation.NewRPCInvocation("echo",
nil, nil),
+ },
+ invokers_filters:
NewINVOKERS_FILTERS().add("env!=gray").add(`region=beijing`).add(`host!=127.0.0.1`),
+ }, {
+ name: "test unMatch condition",
+ content: `configVersion: v3.1
+scope: service
+key: org.apache.dubbo.samples.CommentService
+force: false
+runtime: true
+enabled: true
+conditions:
+ - from:
+ match: env!=gray
+ to:
+ - match: env=gray
+ weight: 100
+ - from:
+ match: region!=beijing
+ to:
+ - match: region=beijing
+ weight: 100
+ - from:
+ to:
+ - match: host!=127.0.0.1
+`,
+ args: args{
+ invokers: buildInvokers(),
+ url:
newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"),
+ invocation: invocation.NewRPCInvocation("echo",
nil, nil),
+ },
+ invokers_filters:
NewINVOKERS_FILTERS().add(`host!=127.0.0.1`),
+ }, {
+ name: "test Match and Route zero",
+ content: `configVersion: v3.1
+scope: service
+key: org.apache.dubbo.samples.CommentService
+force: true # <---
+runtime: true
+enabled: true
+conditions:
+ - from:
+ match: env=gray # match success here
+ to:
+ - match: env=ErrTag # all invoker can't match this
+ weight: 100
+ - from:
+ match: region!=beijing
+ to:
+ - match: region=beijing
+ weight: 100
+ - from:
+ to:
+ - match: host!=127.0.0.1
+`,
+ args: args{
+ invokers: buildInvokers(),
+ url:
newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"),
+ invocation: invocation.NewRPCInvocation("echo",
nil, nil),
+ },
+ expResLen: 0,
+ }, {
+ name: "test Match, Route zero and ignore ",
+ content: `configVersion: v3.1
+scope: service
+key: org.apache.dubbo.samples.CommentService
+force: false # <--- to ignore bad result
+runtime: true
+enabled: true
+conditions:
+ - from:
+ match: region=beijing
+ to:
+ - match: region!=beijing
+ weight: 100
+ - from:
+ to:
+ - match: host!=127.0.0.1
+ - from:
+ match: env=gray # match success here
+ to:
+ - match: env=ErrTag # all invoker can't match this
+ weight: 100
+`,
+ args: args{
+ invokers: buildInvokers(),
+ url:
newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"),
+ invocation: invocation.NewRPCInvocation("echo",
nil, nil),
+ },
+ invokers_filters: NewINVOKERS_FILTERS(),
+ }, {
+ name: "test traffic disabled and ignore
condition-route.force",
+ content: `configVersion: v3.1
+scope: service
+key: org.apache.dubbo.samples.CommentService
+force: false
+runtime: true
+enabled: true
+conditions:
+ - from:
+ match: host=127.0.0.1 # <--- disabled
+ - from:
+ match: env=gray
+ to:
+ - match: env!=gray
+ weight: 100
+ - to:
+ - match: region!=beijing
+`,
+ args: args{
+ invokers: buildInvokers(),
+ url:
newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"),
+ invocation: invocation.NewRPCInvocation("echo",
nil, nil),
+ },
+ expResLen: 0,
+ }, {
+ name: "test multiply destination",
+ content: `configVersion: v3.1
+scope: service
+key: org.apache.dubbo.samples.CommentService
+force: false
+runtime: true
+enabled: true
+conditions:
+ - from:
+ match: env=gray
+ to:
+ - match: env!=gray
+ weight: 100
+ - match: env=gray
+ weight: 900
+ - from:
+ match: region=beijing
+ to:
+ - match: region!=beijing
+ weight: 100
+ - match: region=beijing
+ weight: 200
+`,
+ args: args{
+ invokers: buildInvokers(),
+ url:
newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"),
+ invocation: invocation.NewRPCInvocation("echo",
nil, nil),
+ },
+ multiDestination: []struct {
+ invokers_filters InvokersFilters
+ weight float32
+ }{{
+ invokers_filters:
NewINVOKERS_FILTERS().add(`env=gray`).add(`region=beijing`),
+ weight: float32(900) / float32(1000)
* float32(200) / float32(300),
+ }, {
+ invokers_filters:
NewINVOKERS_FILTERS().add(`env!=gray`).add(`region=beijing`),
+ weight: float32(100) / float32(1000)
* float32(200) / float32(300),
+ }, {
+ invokers_filters:
NewINVOKERS_FILTERS().add(`env=gray`).add(`region!=beijing`),
+ weight: float32(900) / float32(1000)
* float32(100) / float32(300),
+ }, {
+ invokers_filters:
NewINVOKERS_FILTERS().add(`env!=gray`).add(`region!=beijing`),
+ weight: float32(100) / float32(1000)
* float32(100) / float32(300),
+ }},
+ }, {
+ name: "test multiply destination with ignore some
condition",
+ content: `configVersion: v3.1
+scope: service
+key: org.apache.dubbo.samples.CommentService
+force: false
+runtime: true
+enabled: true
+conditions:
+ - from:
+ match: env=gray
+ to:
+ - match: env!=gray
+ weight: 100
+ - match: env=gray----error # will ignore this subset
+ weight: 900
+ - from:
+ match: region=beijing
+ to:
+ - match: region!=beijing
+ weight: 100
+ - match: region=beijing
+ weight: 200
+`,
+ args: args{
+ invokers: buildInvokers(),
+ url:
newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"),
+ invocation: invocation.NewRPCInvocation("echo",
nil, nil),
+ },
+ multiDestination: []struct {
+ invokers_filters InvokersFilters
+ weight float32
+ }{{
+ invokers_filters:
NewINVOKERS_FILTERS().add(`env!=gray`).add(`region=beijing`),
+ weight: float32(200) / float32(300),
+ }, {
+ invokers_filters:
NewINVOKERS_FILTERS().add(`env!=gray`).add(`region!=beijing`),
+ weight: float32(100) / float32(300),
+ }},
+ }, {
+ name: "test multiply destination with ignore some
condition node",
+ content: `configVersion: v3.1
+scope: service
+key: org.apache.dubbo.samples.CommentService
+force: false
+runtime: true
+enabled: true
+conditions:
+ - from:
+ match: env=gray
+ to:
+ - match: env!=gray
+ weight: 100
+ - match: env=gray
+ weight: 900
+ - from: # <-- will ignore this condition
+ match: region!=beijing
+ to:
+ - match: env=normal
+ weight: 100
+ - match: env=gray
+ weight: 200
+ - from:
+ match: region=beijing
+ to:
+ - match: region!=beijing
+ weight: 100
+ - match: region=beijing
+ weight: 200
+`,
+ args: args{
+ invokers: buildInvokers(),
+ url:
newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray®ion=beijing"),
+ invocation: invocation.NewRPCInvocation("echo",
nil, nil),
+ },
+ multiDestination: []struct {
+ invokers_filters InvokersFilters
+ weight float32
+ }{{
+ invokers_filters:
NewINVOKERS_FILTERS().add(`env=gray`).add(`region=beijing`),
+ weight: float32(900) / float32(1000)
* float32(200) / float32(300),
+ }, {
+ invokers_filters:
NewINVOKERS_FILTERS().add(`env!=gray`).add(`region=beijing`),
+ weight: float32(100) / float32(1000)
* float32(200) / float32(300),
+ }, {
+ invokers_filters:
NewINVOKERS_FILTERS().add(`env=gray`).add(`region!=beijing`),
+ weight: float32(900) / float32(1000)
* float32(100) / float32(300),
+ }, {
+ invokers_filters:
NewINVOKERS_FILTERS().add(`env!=gray`).add(`region!=beijing`),
+ weight: float32(100) / float32(1000)
* float32(100) / float32(300),
+ }},
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ d.Process(&config_center.ConfigChangeEvent{
+ Value: tt.content,
+ ConfigType: remoting.EventTypeUpdate,
+ })
+ if tt.multiDestination == nil {
+ res := d.Route(tt.args.invokers, tt.args.url,
tt.args.invocation)
+ if tt.invokers_filters != nil {
+ // check expect filtrate path
+ ans :=
tt.invokers_filters.filtrate(tt.args.invokers, tt.args.url, tt.args.invocation)
+ assert.Equalf(t, ans, res, "route(%v,
%v, %v)", tt.args.invokers, tt.args.url, tt.args.invocation)
+ } else {
+ // check expect result.length
+ assert.Equalf(t, tt.expResLen,
len(res), "route(%v, %v, %v)", tt.args.invokers, tt.args.url,
tt.args.invocation)
+ }
+ } else {
+ // check multiply destination route
successfully or not
+ ans := map[interface{}]float32{}
+ for _, s := range tt.multiDestination {
+ args := struct {
+ invokers []protocol.Invoker
+ url *common.URL
+ invocation protocol.Invocation
+ }{tt.args.invokers[:],
tt.args.url.Clone(), tt.args.invocation}
+
ans[len(s.invokers_filters.filtrate(args.invokers, tt.args.url,
tt.args.invocation))] = s.weight * 1000
+ }
+ res := map[interface{}]int{}
+ for i := 0; i < 1000; i++ {
+ args := struct {
+ invokers []protocol.Invoker
+ url *common.URL
+ invocation protocol.Invocation
+ }{tt.args.invokers[:],
tt.args.url.Clone(), tt.args.invocation}
+ res[len(d.Route(args.invokers,
args.url, args.invocation))]++
+ }
+ for k, v := range ans {
+ if float32(res[k]+50) > v &&
float32(res[k]-50) < v {
+ } else {
+ assert.Fail(t, "out of range")
+ }
+ }
+ }
+ })
}
- got := ar.Route(ivks, consumerUrl,
invocation.NewRPCInvocation("errMethod", nil, nil))
- assert.Equal(t, len(ivks), len(got))
}
diff --git a/common/constant/key.go b/common/constant/key.go
index 21b853989..393036b38 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -17,10 +17,6 @@
package constant
-import (
- "math"
-)
-
type DubboCtxKey string
const (
@@ -313,6 +309,7 @@ const (
ScriptRouterRuleSuffix = ".script-router"
TagRouterRuleSuffix = ".tag-router"
ConditionRouterRuleSuffix = ".condition-router" // Specify
condition router suffix
+ AffinityRuleSuffix = ".affinity-router" // Specify
affinity router suffix
MeshRouteSuffix = ".MESHAPPRULE" // Specify mesh
router suffix
ForceUseTag = "dubbo.force.tag" // the tag in
attachment
ForceUseCondition = "dubbo.force.condition"
@@ -320,22 +317,20 @@ const (
ConditionKey = "dubbo.condition"
AttachmentKey = DubboCtxKey("attachment") // key in
context in invoker
TagRouterFactoryKey = "tag"
+ AffinityAppRouterFactoryKey = "application.affinity"
+ AffinityServiceRouterFactoryKey = "service.affinity"
ConditionAppRouterFactoryKey = "provider.condition"
ConditionServiceRouterFactoryKey = "service.condition"
ScriptRouterFactoryKey = "consumer.script"
ForceKey = "force"
TrafficDisableKey = "trafficDisable"
- PriorityKey = "priority"
- RatioKey = "RatioKey"
Arguments = "arguments"
Attachments = "attachments"
Param = "param"
Scope = "scope"
Wildcard = "wildcard"
MeshRouterFactoryKey = "mesh"
- DefaultRouteRatio = 0
DefaultRouteConditionSubSetWeight = 100
- DefaultRoutePriority = 0
)
// Auth filter
@@ -455,7 +450,4 @@ const (
// priority
const (
DefaultPriority = 0
- HighestPriority = math.MinInt32
- // LowestPriority for metadata service
- LowestPriority = math.MaxInt32
)
diff --git a/config/router_config.go b/config/router_config.go
index e8f155789..7971fd58f 100644
--- a/config/router_config.go
+++ b/config/router_config.go
@@ -17,6 +17,10 @@
package config
+import (
+ "reflect"
+)
+
import (
"github.com/creasty/defaults"
)
@@ -50,12 +54,8 @@ type Tag struct {
}
type ConditionRule struct {
- Priority int `default:"0" yaml:"priority"
json:"priority,omitempty" property:"priority"`
- From ConditionRuleFrom `yaml:"from" json:"from,omitempty"
property:"from"`
- Disable bool `default:"false" yaml:"trafficDisable"
json:"trafficDisable,omitempty" property:"trafficDisable"`
- To []ConditionRuleTo `yaml:"to" json:"to,omitempty" property:"to"`
- Ratio int `default:"0" yaml:"ratio"
json:"ratio,omitempty" property:"priority"`
- Force bool `default:"false" yaml:"force"
json:"force,omitempty" property:"force"`
+ From ConditionRuleFrom `yaml:"from" json:"from,omitempty"
property:"from"`
+ To []ConditionRuleTo `yaml:"to" json:"to,omitempty" property:"to"`
}
type ConditionRuleFrom struct {
@@ -67,14 +67,32 @@ type ConditionRuleTo struct {
Weight int `default:"100" yaml:"weight" json:"weight,omitempty"
property:"weight"`
}
+type ConditionRuleDisable struct {
+ Match string `yaml:"match" json:"match,omitempty" property:"match"`
+}
+
+type AffinityAware struct {
+ Key string `default:"" yaml:"key" json:"key,omitempty" property:"key"`
+ Ratio int32 `default:"0" yaml:"ratio" json:"ratio,omitempty"
property:"ratio"`
+}
+
// ConditionRouter -- when RouteConfigVersion == v3.1, decode by this
type ConditionRouter struct {
- Scope string `validate:"required" yaml:"scope"
json:"scope,omitempty" property:"scope"` // must be chosen from `service` and
`application`.
- Key string `validate:"required" yaml:"key"
json:"key,omitempty" property:"key"` // specifies which service or
application the rule body acts on.
- Force bool `default:"false" yaml:"force"
json:"force,omitempty" property:"force"`
- Runtime bool `default:"false" yaml:"runtime"
json:"runtime,omitempty" property:"runtime"`
- Enabled bool `default:"true" yaml:"enabled"
json:"enabled,omitempty" property:"enabled"`
- Conditions []ConditionRule `yaml:"conditions"
json:"conditions,omitempty" property:"conditions"`
+ Scope string `validate:"required" yaml:"scope"
json:"scope,omitempty" property:"scope"` // must be chosen from `service` and
`application`.
+ Key string `validate:"required" yaml:"key"
json:"key,omitempty" property:"key"` // specifies which service or
application the rule body acts on.
+ Force bool `default:"false" yaml:"force"
json:"force,omitempty" property:"force"`
+ Runtime bool `default:"false" yaml:"runtime"
json:"runtime,omitempty" property:"runtime"`
+ Enabled bool `default:"true" yaml:"enabled"
json:"enabled,omitempty" property:"enabled"`
+ Conditions []*ConditionRule `yaml:"conditions"
json:"conditions,omitempty" property:"conditions"`
+}
+
+// AffinityRouter -- RouteConfigVersion == v3.1
+type AffinityRouter struct {
+ Scope string `validate:"required" yaml:"scope"
json:"scope,omitempty" property:"scope"` // must be chosen from `service` and
`application`.
+ Key string `validate:"required" yaml:"key"
json:"key,omitempty" property:"key"` // specifies which service or
application the rule body acts on.
+ Runtime bool `default:"false" yaml:"runtime"
json:"runtime,omitempty" property:"runtime"`
+ Enabled bool `default:"true" yaml:"enabled"
json:"enabled,omitempty" property:"enabled"`
+ AffinityAware AffinityAware `yaml:"affinityAware"
json:"affinityAware,omitempty" property:"affinityAware"`
}
// Prefix dubbo.router
@@ -179,3 +197,18 @@ func (rcb *RouterConfigBuilder) Build() *RouterConfig {
}
return rcb.routerConfig
}
+
+func (x *ConditionRule) Equal(t *ConditionRule) bool {
+ if !reflect.DeepEqual(x.From, t.From) {
+ return false
+ }
+ if len(x.To) != len(t.To) {
+ return false
+ }
+ for i := range x.To {
+ if !reflect.DeepEqual(x.To[i], t.To[i]) {
+ return false
+ }
+ }
+ return true
+}