This is an automated email from the ASF dual-hosted git repository.

mark4z pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git


The following commit(s) were added to refs/heads/develop by this push:
     new a494b710 ospp: Feature/traffic (#496)
a494b710 is described below

commit a494b710bcbce60578e22694936b61a9b869b499
Author: 祭酒 <[email protected]>
AuthorDate: Fri Sep 16 15:51:39 2022 +0800

    ospp: Feature/traffic (#496)
    
    * refactor: TrafficFilter improve
    
    * fix: constant declare with explicit type.
    
    * fix: traffic.go follow by baerrewang's suggestion.
    
    * traffic.go update
    
    * fix: function name change in traffic filter.
---
 pixiu/pkg/filter/traffic/models.go  | 56 ++-------------------
 pixiu/pkg/filter/traffic/traffic.go | 98 +++++++++++++++++++++++++++++--------
 2 files changed, 82 insertions(+), 72 deletions(-)

diff --git a/pixiu/pkg/filter/traffic/models.go 
b/pixiu/pkg/filter/traffic/models.go
index 3d970c93..eb6025df 100644
--- a/pixiu/pkg/filter/traffic/models.go
+++ b/pixiu/pkg/filter/traffic/models.go
@@ -18,61 +18,13 @@
 package traffic
 
 import (
-       "math/rand"
        "net/http"
-       "strconv"
-       "time"
 )
 
-type splitAction func(header string, expectedValue string, r *http.Request) 
bool
-
-type ActionWrapper struct {
-       name   CanaryHeaders
-       action splitAction
-}
-
-type Actions struct {
-       wrappers []*ActionWrapper
-}
-
-var actions Actions
-
-func initActions() {
-       // canaryByHeader
-       headerAction := &ActionWrapper{
-               name: canaryByHeader,
-               action: func(header string, value string, r *http.Request) bool 
{
-                       if v := r.Header.Get(header); v != "" {
-                               return v == value
-                       }
-                       return false
-               },
-       }
-
-       // canaryWeight
-       weightAction := &ActionWrapper{
-               name: canaryWeight,
-               action: func(header string, value string, r *http.Request) bool 
{
-                       rand.Seed(time.Now().UnixNano())
-                       num := rand.Intn(100) + 1
-                       intValue, _ := strconv.Atoi(value)
-                       return num <= intValue
-               },
-       }
-
-       actions.wrappers = append(actions.wrappers, headerAction)
-       actions.wrappers = append(actions.wrappers, weightAction)
+func spiltHeader(req *http.Request, value string) bool {
+       return req.Header.Get(string(canaryByHeader)) == value
 }
 
-func spilt(request *http.Request, rules map[CanaryHeaders]*Cluster) *Cluster {
-       for _, wrapper := range actions.wrappers {
-               // call action, decide to return cluster or not
-               if cluster, exist := rules[wrapper.name]; exist {
-                       header, value := cluster.Canary, cluster.Value
-                       if wrapper.action(header, value, request) {
-                               return cluster
-                       }
-               }
-       }
-       return nil
+func spiltWeight(weight, floor, ceil int) bool {
+       return weight >= floor && weight < ceil
 }
diff --git a/pixiu/pkg/filter/traffic/traffic.go 
b/pixiu/pkg/filter/traffic/traffic.go
index ecdaa5d8..5ce6a3be 100644
--- a/pixiu/pkg/filter/traffic/traffic.go
+++ b/pixiu/pkg/filter/traffic/traffic.go
@@ -18,7 +18,11 @@
 package traffic
 
 import (
+       "fmt"
+       "math/rand"
+       "strconv"
        "strings"
+       "time"
 )
 
 import (
@@ -30,7 +34,7 @@ import (
 
 const (
        canaryByHeader CanaryHeaders = "canary-by-header"
-       canaryWeight   CanaryHeaders = "canary-weight"
+       unInitialize   int           = -1
 )
 
 func init() {
@@ -48,19 +52,27 @@ type (
        }
 
        Filter struct {
-               Rules map[CanaryHeaders]*Cluster
+               weight int
+               record map[string]struct{}
+               Rules  []*ClusterWrapper
        }
 
        Config struct {
                Traffics []*Cluster `yaml:"traffics" json:"traffics" 
mapstructure:"traffics"`
        }
 
+       ClusterWrapper struct {
+               Cluster     *Cluster
+               header      string
+               weightCeil  int
+               weightFloor int
+       }
+
        Cluster struct {
-               Name   string        `yaml:"name" json:"name" 
mapstructure:"name"`
-               Router string        `yaml:"router" json:"router" 
mapstructure:"router"`
-               Model  CanaryHeaders `yaml:"model" json:"model" 
mapstructure:"model"`    // canary model
-               Canary string        `yaml:"canary" json:"canary" 
mapstructure:"canary"` // header key
-               Value  string        `yaml:"value" json:"value" 
mapstructure:"value"`    // header value
+               Name           string `yaml:"name" json:"name" 
mapstructure:"name"`
+               Router         string `yaml:"router" json:"router" 
mapstructure:"router"`
+               CanaryByHeader string `yaml:"canary-by-header" 
json:"canary-by-header" mapstructure:"canary-by-header"`
+               CanaryWeight   string `yaml:"canary-weight" 
json:"canary-weight" mapstructure:"canary-weight"`
        }
 )
 
@@ -79,23 +91,27 @@ func (factory *FilterFactory) Config() interface{} {
 }
 
 func (factory *FilterFactory) Apply() error {
-       initActions()
        return nil
 }
 
 func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain 
filter.FilterChain) error {
-       f := &Filter{}
-       f.Rules = factory.rulesMatch(ctx.Request.RequestURI)
+       f := &Filter{
+               weight: unInitialize,
+               record: map[string]struct{}{},
+       }
+       f.Rules = factory.rulesMatch(f, ctx.Request.RequestURI)
        chain.AppendDecodeFilters(f)
        return nil
 }
 
 func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus {
        if f.Rules != nil {
-               cluster := spilt(ctx.Request, f.Rules)
-               if cluster != nil {
-                       ctx.Route.Cluster = cluster.Name
-                       logger.Debugf("[dubbo-go-pixiu] execute traffic split 
to cluster %s", cluster.Name)
+               for _, wp := range f.Rules {
+                       if f.traffic(wp, ctx) {
+                               ctx.Route.Cluster = wp.Cluster.Name
+                               logger.Debugf("[dubbo-go-pixiu] execute traffic 
split to cluster %s", wp.Cluster.Name)
+                               break
+                       }
                }
        } else {
                logger.Warnf("[dubbo-go-pixiu] execute traffic split fail 
because of empty rules.")
@@ -103,17 +119,59 @@ func (f *Filter) Decode(ctx *http.HttpContext) 
filter.FilterStatus {
        return filter.Continue
 }
 
-func (factory *FilterFactory) rulesMatch(path string) 
map[CanaryHeaders]*Cluster {
+func (f *Filter) traffic(c *ClusterWrapper, ctx *http.HttpContext) bool {
+       if f.weight == unInitialize {
+               rand.Seed(time.Now().UnixNano())
+               f.weight = rand.Intn(100) + 1
+       }
+
+       res := false
+       if c.header != "" {
+               res = spiltHeader(ctx.Request, c.header)
+       } else if !res && c.weightFloor != -1 && c.weightCeil != -1 {
+               res = spiltWeight(f.weight, c.weightFloor, c.weightCeil)
+       }
+       return res
+}
+
+func (factory *FilterFactory) rulesMatch(f *Filter, path string) 
[]*ClusterWrapper {
        clusters := factory.cfg.Traffics
 
        if clusters != nil {
-               mp := make(map[CanaryHeaders]*Cluster)
-               for i := range clusters {
-                       if strings.HasPrefix(clusters[i].Router, path) {
-                               mp[clusters[i].Model] = clusters[i]
+               rules := make([]*ClusterWrapper, 0)
+               up := 0
+               for _, cluster := range clusters {
+                       if strings.HasPrefix(path, cluster.Router) {
+                               wp := &ClusterWrapper{
+                                       Cluster:     cluster,
+                                       header:      "",
+                                       weightCeil:  -1,
+                                       weightFloor: -1,
+                               }
+                               if cluster.CanaryByHeader != "" {
+                                       if _, ok := 
f.record[cluster.CanaryByHeader]; ok {
+                                               logger.Errorf("Duplicate 
canary-by-header values")
+                                       } else {
+                                               
f.record[cluster.CanaryByHeader] = struct{}{}
+                                               wp.header = 
cluster.CanaryByHeader
+                                       }
+                               }
+                               if cluster.CanaryWeight != "" {
+                                       val, err := 
strconv.Atoi(cluster.CanaryWeight)
+                                       if err != nil || val <= 0 {
+                                               
logger.Errorf(fmt.Sprintf("Wrong canary-weight value: %v", 
cluster.CanaryWeight))
+                                       }
+                                       wp.weightFloor = up
+                                       up += val
+                                       if up > 100 {
+                                               logger.Errorf("[dubbo-go-pixiu] 
clusters' weight sum more than 100 in %v service!", cluster.Router)
+                                       }
+                                       wp.weightCeil = up
+                               }
+                               rules = append(rules, wp)
                        }
                }
-               return mp
+               return rules
        }
        return nil
 }

Reply via email to