This is an automated email from the ASF dual-hosted git repository.
mark4z pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/develop by this push:
new a494b710 ospp: Feature/traffic (#496)
a494b710 is described below
commit a494b710bcbce60578e22694936b61a9b869b499
Author: 祭酒 <[email protected]>
AuthorDate: Fri Sep 16 15:51:39 2022 +0800
ospp: Feature/traffic (#496)
* refactor: TrafficFilter improve
* fix: constant declare with explicit type.
* fix: traffic.go follow by baerrewang's suggestion.
* traffic.go update
* fix: function name change in traffic filter.
---
pixiu/pkg/filter/traffic/models.go | 56 ++-------------------
pixiu/pkg/filter/traffic/traffic.go | 98 +++++++++++++++++++++++++++++--------
2 files changed, 82 insertions(+), 72 deletions(-)
diff --git a/pixiu/pkg/filter/traffic/models.go
b/pixiu/pkg/filter/traffic/models.go
index 3d970c93..eb6025df 100644
--- a/pixiu/pkg/filter/traffic/models.go
+++ b/pixiu/pkg/filter/traffic/models.go
@@ -18,61 +18,13 @@
package traffic
import (
- "math/rand"
"net/http"
- "strconv"
- "time"
)
-type splitAction func(header string, expectedValue string, r *http.Request)
bool
-
-type ActionWrapper struct {
- name CanaryHeaders
- action splitAction
-}
-
-type Actions struct {
- wrappers []*ActionWrapper
-}
-
-var actions Actions
-
-func initActions() {
- // canaryByHeader
- headerAction := &ActionWrapper{
- name: canaryByHeader,
- action: func(header string, value string, r *http.Request) bool
{
- if v := r.Header.Get(header); v != "" {
- return v == value
- }
- return false
- },
- }
-
- // canaryWeight
- weightAction := &ActionWrapper{
- name: canaryWeight,
- action: func(header string, value string, r *http.Request) bool
{
- rand.Seed(time.Now().UnixNano())
- num := rand.Intn(100) + 1
- intValue, _ := strconv.Atoi(value)
- return num <= intValue
- },
- }
-
- actions.wrappers = append(actions.wrappers, headerAction)
- actions.wrappers = append(actions.wrappers, weightAction)
+func spiltHeader(req *http.Request, value string) bool {
+ return req.Header.Get(string(canaryByHeader)) == value
}
-func spilt(request *http.Request, rules map[CanaryHeaders]*Cluster) *Cluster {
- for _, wrapper := range actions.wrappers {
- // call action, decide to return cluster or not
- if cluster, exist := rules[wrapper.name]; exist {
- header, value := cluster.Canary, cluster.Value
- if wrapper.action(header, value, request) {
- return cluster
- }
- }
- }
- return nil
+func spiltWeight(weight, floor, ceil int) bool {
+ return weight >= floor && weight < ceil
}
diff --git a/pixiu/pkg/filter/traffic/traffic.go
b/pixiu/pkg/filter/traffic/traffic.go
index ecdaa5d8..5ce6a3be 100644
--- a/pixiu/pkg/filter/traffic/traffic.go
+++ b/pixiu/pkg/filter/traffic/traffic.go
@@ -18,7 +18,11 @@
package traffic
import (
+ "fmt"
+ "math/rand"
+ "strconv"
"strings"
+ "time"
)
import (
@@ -30,7 +34,7 @@ import (
const (
canaryByHeader CanaryHeaders = "canary-by-header"
- canaryWeight CanaryHeaders = "canary-weight"
+ unInitialize int = -1
)
func init() {
@@ -48,19 +52,27 @@ type (
}
Filter struct {
- Rules map[CanaryHeaders]*Cluster
+ weight int
+ record map[string]struct{}
+ Rules []*ClusterWrapper
}
Config struct {
Traffics []*Cluster `yaml:"traffics" json:"traffics"
mapstructure:"traffics"`
}
+ ClusterWrapper struct {
+ Cluster *Cluster
+ header string
+ weightCeil int
+ weightFloor int
+ }
+
Cluster struct {
- Name string `yaml:"name" json:"name"
mapstructure:"name"`
- Router string `yaml:"router" json:"router"
mapstructure:"router"`
- Model CanaryHeaders `yaml:"model" json:"model"
mapstructure:"model"` // canary model
- Canary string `yaml:"canary" json:"canary"
mapstructure:"canary"` // header key
- Value string `yaml:"value" json:"value"
mapstructure:"value"` // header value
+ Name string `yaml:"name" json:"name"
mapstructure:"name"`
+ Router string `yaml:"router" json:"router"
mapstructure:"router"`
+ CanaryByHeader string `yaml:"canary-by-header"
json:"canary-by-header" mapstructure:"canary-by-header"`
+ CanaryWeight string `yaml:"canary-weight"
json:"canary-weight" mapstructure:"canary-weight"`
}
)
@@ -79,23 +91,27 @@ func (factory *FilterFactory) Config() interface{} {
}
func (factory *FilterFactory) Apply() error {
- initActions()
return nil
}
func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain
filter.FilterChain) error {
- f := &Filter{}
- f.Rules = factory.rulesMatch(ctx.Request.RequestURI)
+ f := &Filter{
+ weight: unInitialize,
+ record: map[string]struct{}{},
+ }
+ f.Rules = factory.rulesMatch(f, ctx.Request.RequestURI)
chain.AppendDecodeFilters(f)
return nil
}
func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus {
if f.Rules != nil {
- cluster := spilt(ctx.Request, f.Rules)
- if cluster != nil {
- ctx.Route.Cluster = cluster.Name
- logger.Debugf("[dubbo-go-pixiu] execute traffic split
to cluster %s", cluster.Name)
+ for _, wp := range f.Rules {
+ if f.traffic(wp, ctx) {
+ ctx.Route.Cluster = wp.Cluster.Name
+ logger.Debugf("[dubbo-go-pixiu] execute traffic
split to cluster %s", wp.Cluster.Name)
+ break
+ }
}
} else {
logger.Warnf("[dubbo-go-pixiu] execute traffic split fail
because of empty rules.")
@@ -103,17 +119,59 @@ func (f *Filter) Decode(ctx *http.HttpContext)
filter.FilterStatus {
return filter.Continue
}
-func (factory *FilterFactory) rulesMatch(path string)
map[CanaryHeaders]*Cluster {
+func (f *Filter) traffic(c *ClusterWrapper, ctx *http.HttpContext) bool {
+ if f.weight == unInitialize {
+ rand.Seed(time.Now().UnixNano())
+ f.weight = rand.Intn(100) + 1
+ }
+
+ res := false
+ if c.header != "" {
+ res = spiltHeader(ctx.Request, c.header)
+ } else if !res && c.weightFloor != -1 && c.weightCeil != -1 {
+ res = spiltWeight(f.weight, c.weightFloor, c.weightCeil)
+ }
+ return res
+}
+
+func (factory *FilterFactory) rulesMatch(f *Filter, path string)
[]*ClusterWrapper {
clusters := factory.cfg.Traffics
if clusters != nil {
- mp := make(map[CanaryHeaders]*Cluster)
- for i := range clusters {
- if strings.HasPrefix(clusters[i].Router, path) {
- mp[clusters[i].Model] = clusters[i]
+ rules := make([]*ClusterWrapper, 0)
+ up := 0
+ for _, cluster := range clusters {
+ if strings.HasPrefix(path, cluster.Router) {
+ wp := &ClusterWrapper{
+ Cluster: cluster,
+ header: "",
+ weightCeil: -1,
+ weightFloor: -1,
+ }
+ if cluster.CanaryByHeader != "" {
+ if _, ok :=
f.record[cluster.CanaryByHeader]; ok {
+ logger.Errorf("Duplicate
canary-by-header values")
+ } else {
+
f.record[cluster.CanaryByHeader] = struct{}{}
+ wp.header =
cluster.CanaryByHeader
+ }
+ }
+ if cluster.CanaryWeight != "" {
+ val, err :=
strconv.Atoi(cluster.CanaryWeight)
+ if err != nil || val <= 0 {
+
logger.Errorf(fmt.Sprintf("Wrong canary-weight value: %v",
cluster.CanaryWeight))
+ }
+ wp.weightFloor = up
+ up += val
+ if up > 100 {
+ logger.Errorf("[dubbo-go-pixiu]
clusters' weight sum more than 100 in %v service!", cluster.Router)
+ }
+ wp.weightCeil = up
+ }
+ rules = append(rules, wp)
}
}
- return mp
+ return rules
}
return nil
}