This is an automated email from the ASF dual-hosted git repository.

ztelur pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git


The following commit(s) were added to refs/heads/develop by this push:
     new a27282e  feat:load balancer strategy mode and add consistent hash 
(#373)
a27282e is described below

commit a27282efe858dd557af94f4eef4b72329839dd01
Author: baerwang <[email protected]>
AuthorDate: Fri Mar 25 16:15:29 2022 +0800

    feat:load balancer strategy mode and add consistent hash (#373)
    
    * feat:load balancer strategy mode and add consistent hash
    
    * fix triple samples ci
    
    * fix:go fmt
    
    * style:use gost library consistent hash
    
    * style:temporary delete consistent hash wait etcd new version upgrade add 
it back
    
    Co-authored-by: mark4z <[email protected]>
---
 pkg/adapter/springcloud/cloud.go                   |  3 +-
 .../zookeeper/application_listener.go              |  2 +-
 .../servicediscovery/zookeeper/service_listener.go |  4 +-
 pkg/adapter/xds/cds.go                             |  9 ++-
 .../loadbalancer/load_balancer.go}                 | 28 ++++-----
 .../loadbalancer/rand/load_balancer_rand.go}       | 27 ++++-----
 .../loadbalancer/roundrobin/round_robin.go}        | 31 +++++-----
 pkg/config/config_load.go                          | 19 +-----
 pkg/model/cluster.go                               | 70 ++++++----------------
 pkg/model/lb.go                                    | 23 +++----
 pkg/server/cluster_manager.go                      | 22 ++++++-
 .../simple/triple/docker/docker-health-check.sh    |  3 +-
 12 files changed, 100 insertions(+), 141 deletions(-)

diff --git a/pkg/adapter/springcloud/cloud.go b/pkg/adapter/springcloud/cloud.go
index 52860b3..1e72fe6 100644
--- a/pkg/adapter/springcloud/cloud.go
+++ b/pkg/adapter/springcloud/cloud.go
@@ -293,8 +293,7 @@ func (a *CloudAdapter) fetchCompareAndSet() {
        // first remove the router for removed cluster
        for _, c := range oldStore.Config {
                if !newStore.HasCluster(c.Name) {
-                       delete := &model.Router{ID: c.Name}
-                       rm.DeleteRouter(delete)
+                       rm.DeleteRouter(&model.Router{ID: c.Name})
                }
        }
        // second set cluster
diff --git 
a/pkg/adapter/springcloud/servicediscovery/zookeeper/application_listener.go 
b/pkg/adapter/springcloud/servicediscovery/zookeeper/application_listener.go
index 4d89d0b..d0d16dc 100644
--- a/pkg/adapter/springcloud/servicediscovery/zookeeper/application_listener.go
+++ b/pkg/adapter/springcloud/servicediscovery/zookeeper/application_listener.go
@@ -79,7 +79,7 @@ func (z *zkAppListener) watch() {
                if err != nil {
                        failTimes++
                        logger.Infof("watching (path{%s}) = error{%v}", 
z.servicesPath, err)
-                       if err == gzk.ErrNilNode {
+                       if err == zk.ErrNoNode {
                                logger.Errorf("watching (path{%s}) got 
errNilNode,so exit listen", z.servicesPath)
                                return
                        }
diff --git 
a/pkg/adapter/springcloud/servicediscovery/zookeeper/service_listener.go 
b/pkg/adapter/springcloud/servicediscovery/zookeeper/service_listener.go
index 086f3be..a454e8f 100644
--- a/pkg/adapter/springcloud/servicediscovery/zookeeper/service_listener.go
+++ b/pkg/adapter/springcloud/servicediscovery/zookeeper/service_listener.go
@@ -63,10 +63,10 @@ func (asl *applicationServiceListener) WatchAndHandle() {
                if err != nil {
                        failTimes++
                        logger.Infof("watching (path{%s}) = error{%v}", 
asl.servicePath, err)
-                       if err == gzk.ErrNilChildren {
+                       if err == zk.ErrNoChildrenForEphemerals {
                                return
                        }
-                       if err == gzk.ErrNilNode {
+                       if err == zk.ErrNoNode {
                                logger.Errorf("watching (path{%s}) got 
errNilNode,so exit listen", asl.servicePath)
                                return
                        }
diff --git a/pkg/adapter/xds/cds.go b/pkg/adapter/xds/cds.go
index ff97ecc..dade74e 100644
--- a/pkg/adapter/xds/cds.go
+++ b/pkg/adapter/xds/cds.go
@@ -141,19 +141,18 @@ func (c *CdsManager) makeCluster(cluster *xdspb.Cluster) 
*model.Cluster {
                TypeStr:          cluster.TypeStr,
                Type:             c.makeClusterType(cluster),
                EdsClusterConfig: 
c.makeEdsClusterConfig(cluster.EdsClusterConfig),
-               LbStr:            cluster.LbStr,
-               Lb:               c.makeLoadBalancePolicy(cluster),
+               LbStr:            c.makeLoadBalancePolicy(cluster.LbStr),
                HealthChecks:     c.makeHealthChecks(cluster.HealthChecks),
                Endpoints:        c.makeEndpoints(cluster.Endpoints),
        }
 }
 
-func (c *CdsManager) makeLoadBalancePolicy(cluster *xdspb.Cluster) 
model.LbPolicy {
-       return model.LbPolicy(model.LbPolicyValue[cluster.LbStr])
+func (c *CdsManager) makeLoadBalancePolicy(lb string) model.LbPolicyType {
+       return model.LbPolicyTypeValue[lb]
 }
 
 func (c *CdsManager) makeClusterType(cluster *xdspb.Cluster) 
model.DiscoveryType {
-       return model.DiscoveryType(model.DiscoveryTypeValue[cluster.TypeStr])
+       return model.DiscoveryTypeValue[cluster.TypeStr]
 }
 
 func (c *CdsManager) makeEndpoints(endpoint *xdspb.Endpoint) []*model.Endpoint 
{
diff --git a/pkg/model/lb.go b/pkg/cluster/loadbalancer/load_balancer.go
similarity index 63%
copy from pkg/model/lb.go
copy to pkg/cluster/loadbalancer/load_balancer.go
index 028cee5..fb2e3e3 100644
--- a/pkg/model/lb.go
+++ b/pkg/cluster/loadbalancer/load_balancer.go
@@ -15,24 +15,22 @@
  * limitations under the License.
  */
 
-package model
+package loadbalancer
 
-// LbPolicy the load balance policy enum
-type LbPolicy int32
-
-const (
-       RoundRobin LbPolicy = 0
-       Rand       LbPolicy = 3
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
 )
 
-// LbPolicyName key int32 for LbPolicy, value string
-var LbPolicyName = map[int32]string{
-       0: "RoundRobin",
-       3: "Rand",
+type LoadBalancer interface {
+       Handler(c *model.Cluster) *model.Endpoint
 }
 
-// LbPolicyValue key string, value int32 for LbPolicy
-var LbPolicyValue = map[string]int32{
-       "RoundRobin": 0,
-       "Rand":       3,
+// LoadBalancerStrategy load balancer strategy mode
+var LoadBalancerStrategy = map[model.LbPolicyType]LoadBalancer{}
+
+func RegisterLoadBalancer(name model.LbPolicyType, balancer LoadBalancer) {
+       if _, ok := LoadBalancerStrategy[name]; ok {
+               panic("load balancer register fail " + name)
+       }
+       LoadBalancerStrategy[name] = balancer
 }
diff --git a/pkg/model/lb.go 
b/pkg/cluster/loadbalancer/rand/load_balancer_rand.go
similarity index 68%
copy from pkg/model/lb.go
copy to pkg/cluster/loadbalancer/rand/load_balancer_rand.go
index 028cee5..1c7899c 100644
--- a/pkg/model/lb.go
+++ b/pkg/cluster/loadbalancer/rand/load_balancer_rand.go
@@ -15,24 +15,23 @@
  * limitations under the License.
  */
 
-package model
+package rand
 
-// LbPolicy the load balance policy enum
-type LbPolicy int32
+import (
+       "math/rand"
+)
 
-const (
-       RoundRobin LbPolicy = 0
-       Rand       LbPolicy = 3
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/cluster/loadbalancer"
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
 )
 
-// LbPolicyName key int32 for LbPolicy, value string
-var LbPolicyName = map[int32]string{
-       0: "RoundRobin",
-       3: "Rand",
+func init() {
+       loadbalancer.RegisterLoadBalancer(model.LoadBalancerRand, Rand{})
 }
 
-// LbPolicyValue key string, value int32 for LbPolicy
-var LbPolicyValue = map[string]int32{
-       "RoundRobin": 0,
-       "Rand":       3,
+type Rand struct{}
+
+func (Rand) Handler(c *model.Cluster) *model.Endpoint {
+       return c.Endpoints[rand.Intn(len(c.Endpoints))]
 }
diff --git a/pkg/model/lb.go 
b/pkg/cluster/loadbalancer/roundrobin/round_robin.go
similarity index 60%
copy from pkg/model/lb.go
copy to pkg/cluster/loadbalancer/roundrobin/round_robin.go
index 028cee5..138b62f 100644
--- a/pkg/model/lb.go
+++ b/pkg/cluster/loadbalancer/roundrobin/round_robin.go
@@ -15,24 +15,25 @@
  * limitations under the License.
  */
 
-package model
+package roundrobin
 
-// LbPolicy the load balance policy enum
-type LbPolicy int32
-
-const (
-       RoundRobin LbPolicy = 0
-       Rand       LbPolicy = 3
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/cluster/loadbalancer"
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
 )
 
-// LbPolicyName key int32 for LbPolicy, value string
-var LbPolicyName = map[int32]string{
-       0: "RoundRobin",
-       3: "Rand",
+func init() {
+       loadbalancer.RegisterLoadBalancer(model.LoadBalancerRoundRobin, 
RoundRobin{})
 }
 
-// LbPolicyValue key string, value int32 for LbPolicy
-var LbPolicyValue = map[string]int32{
-       "RoundRobin": 0,
-       "Rand":       3,
+type RoundRobin struct{}
+
+func (RoundRobin) Handler(c *model.Cluster) *model.Endpoint {
+       lens := len(c.Endpoints)
+       if c.PrePickEndpointIndex >= lens {
+               c.PrePickEndpointIndex = 0
+       }
+       e := c.Endpoints[c.PrePickEndpointIndex]
+       c.PrePickEndpointIndex = (c.PrePickEndpointIndex + 1) % lens
+       return e
 }
diff --git a/pkg/config/config_load.go b/pkg/config/config_load.go
index 870e1be..d0b5efb 100644
--- a/pkg/config/config_load.go
+++ b/pkg/config/config_load.go
@@ -150,21 +150,6 @@ func GetLoadBalance(cfg *model.Bootstrap) (err error) {
                logger.Error("Bootstrap configuration is null")
                return err
        }
-       var lbPolicy int32
-       for _, c := range cfg.StaticResources.Clusters {
-               flag := true
-               if c.TypeStr != "" {
-                       if t, ok := model.LbPolicyValue[c.LbStr]; ok {
-                               lbPolicy = t
-                               flag = false
-                       }
-               }
-               if flag {
-                       c.LbStr = constant.DefaultLoadBalanceType
-                       lbPolicy = model.LbPolicyValue[c.LbStr]
-               }
-               c.Lb = model.LbPolicy(lbPolicy)
-       }
        return nil
 }
 
@@ -173,7 +158,7 @@ func GetDiscoveryType(cfg *model.Bootstrap) (err error) {
                logger.Error("Bootstrap configuration is null")
                return err
        }
-       var discoveryType int32
+       var discoveryType model.DiscoveryType
        for _, c := range cfg.StaticResources.Clusters {
                flag := true
                if c.TypeStr != "" {
@@ -186,7 +171,7 @@ func GetDiscoveryType(cfg *model.Bootstrap) (err error) {
                        c.TypeStr = constant.DefaultDiscoveryType
                        discoveryType = model.DiscoveryTypeValue[c.TypeStr]
                }
-               c.Type = model.DiscoveryType(discoveryType)
+               c.Type = discoveryType
        }
        return nil
 }
diff --git a/pkg/model/cluster.go b/pkg/model/cluster.go
index c18580c..fdf4f22 100644
--- a/pkg/model/cluster.go
+++ b/pkg/model/cluster.go
@@ -17,12 +17,8 @@
 
 package model
 
-import (
-       "math/rand"
-)
-
 const (
-       Static DiscoveryType = 0 + iota
+       Static DiscoveryType = iota
        StrictDNS
        LogicalDns
        EDS
@@ -31,21 +27,21 @@ const (
 
 var (
        // DiscoveryTypeName
-       DiscoveryTypeName = map[int32]string{
-               0: "Static",
-               1: "StrictDNS",
-               2: "LogicalDns",
-               3: "EDS",
-               4: "OriginalDst",
+       DiscoveryTypeName = map[DiscoveryType]string{
+               Static:      "Static",
+               StrictDNS:   "StrictDNS",
+               LogicalDns:  "LogicalDns",
+               EDS:         "EDS",
+               OriginalDst: "OriginalDst",
        }
 
        // DiscoveryTypeValue
-       DiscoveryTypeValue = map[string]int32{
-               "Static":      0,
-               "StrictDNS":   1,
-               "LogicalDns":  2,
-               "EDS":         3,
-               "OriginalDst": 4,
+       DiscoveryTypeValue = map[string]DiscoveryType{
+               "Static":      Static,
+               "StrictDNS":   StrictDNS,
+               "LogicalDns":  LogicalDns,
+               "EDS":         EDS,
+               "OriginalDst": OriginalDst,
        }
 )
 
@@ -56,11 +52,10 @@ type (
                TypeStr              string           `yaml:"type" json:"type"` 
// Type the cluster discovery type string value
                Type                 DiscoveryType    `yaml:"-" json:"-"`       
// Type the cluster discovery type
                EdsClusterConfig     EdsClusterConfig 
`yaml:"eds_cluster_config" json:"eds_cluster_config" 
mapstructure:"eds_cluster_config"`
-               LbStr                string           `yaml:"lb_policy" 
json:"lb_policy"`   // Lb the cluster select node used loadBalance policy
-               Lb                   LbPolicy         `yaml:",omitempty" 
json:",omitempty"` // Lb the cluster select node used loadBalance policy
+               LbStr                LbPolicyType     `yaml:"lb_policy" 
json:"lb_policy"` // Lb the cluster select node used loadBalance policy
                HealthChecks         []HealthCheck    `yaml:"health_checks" 
json:"health_checks"`
                Endpoints            []*Endpoint      `yaml:"endpoints" 
json:"endpoints"`
-               prePickEndpointIndex int
+               PrePickEndpointIndex int
        }
 
        // EdsClusterConfig todo remove un-used EdsClusterConfig
@@ -83,36 +78,9 @@ type (
 
        // Endpoint
        Endpoint struct {
-               ID      string        `yaml:"ID" json:"ID"`     // ID indicate 
one endpoint
-               Name    string        `yaml:"name" json:"name"` // Name the 
cluster unique name
-               Address SocketAddress `yaml:"socket_address" 
json:"socket_address" mapstructure:"socket_address"`
-               // extra info such as label or other meta data
-               Metadata map[string]string `yaml:"meta" json:"meta"`
+               ID       string            `yaml:"ID" json:"ID"`                
                                       // ID indicate one endpoint
+               Name     string            `yaml:"name" json:"name"`            
                                       // Name the cluster unique name
+               Address  SocketAddress     `yaml:"socket_address" 
json:"socket_address" mapstructure:"socket_address"` // Address socket address
+               Metadata map[string]string `yaml:"meta" json:"meta"`            
                                       // Metadata extra info such as label or 
other meta data
        }
 )
-
-func (c *Cluster) PickOneEndpoint() *Endpoint {
-       // TODO: add lb strategy abstraction
-       if c.Endpoints == nil || len(c.Endpoints) == 0 {
-               return nil
-       }
-
-       if len(c.Endpoints) == 1 {
-               return c.Endpoints[0]
-       }
-
-       if c.Lb == Rand {
-               return c.Endpoints[rand.Intn(len(c.Endpoints))]
-       } else if c.Lb == RoundRobin {
-
-               lens := len(c.Endpoints)
-               if c.prePickEndpointIndex >= lens {
-                       c.prePickEndpointIndex = 0
-               }
-               e := c.Endpoints[c.prePickEndpointIndex]
-               c.prePickEndpointIndex = (c.prePickEndpointIndex + 1) % lens
-               return e
-       } else {
-               return c.Endpoints[rand.Intn(len(c.Endpoints))]
-       }
-}
diff --git a/pkg/model/lb.go b/pkg/model/lb.go
index 028cee5..32a4f9d 100644
--- a/pkg/model/lb.go
+++ b/pkg/model/lb.go
@@ -17,22 +17,17 @@
 
 package model
 
-// LbPolicy the load balance policy enum
-type LbPolicy int32
+// LbPolicyType the load balance policy enum
+type LbPolicyType string
 
 const (
-       RoundRobin LbPolicy = 0
-       Rand       LbPolicy = 3
+       LoadBalancerRand             LbPolicyType = "Rand"
+       LoadBalancerRoundRobin       LbPolicyType = "RoundRobin"
+       LoadBalanceConsistentHashing LbPolicyType = "ConsistentHashing"
 )
 
-// LbPolicyName key int32 for LbPolicy, value string
-var LbPolicyName = map[int32]string{
-       0: "RoundRobin",
-       3: "Rand",
-}
-
-// LbPolicyValue key string, value int32 for LbPolicy
-var LbPolicyValue = map[string]int32{
-       "RoundRobin": 0,
-       "Rand":       3,
+var LbPolicyTypeValue = map[string]LbPolicyType{
+       "Rand":              LoadBalancerRand,
+       "RoundRobin":        LoadBalancerRoundRobin,
+       "ConsistentHashing": LoadBalanceConsistentHashing,
 }
diff --git a/pkg/server/cluster_manager.go b/pkg/server/cluster_manager.go
index f659d94..9933ff5 100644
--- a/pkg/server/cluster_manager.go
+++ b/pkg/server/cluster_manager.go
@@ -23,6 +23,7 @@ import (
 )
 
 import (
+       "github.com/apache/dubbo-go-pixiu/pkg/cluster/loadbalancer"
        "github.com/apache/dubbo-go-pixiu/pkg/common/yaml"
        "github.com/apache/dubbo-go-pixiu/pkg/logger"
        "github.com/apache/dubbo-go-pixiu/pkg/model"
@@ -120,13 +121,28 @@ func (cm *ClusterManager) PickEndpoint(clusterName 
string) *model.Endpoint {
 
        for _, cluster := range cm.store.Config {
                if cluster.Name == clusterName {
-                       // according to lb to choose one endpoint, now only 
random
-                       return cluster.PickOneEndpoint()
+                       return pickOneEndpoint(cluster)
                }
        }
        return nil
 }
 
+func pickOneEndpoint(c *model.Cluster) *model.Endpoint {
+       if c.Endpoints == nil || len(c.Endpoints) == 0 {
+               return nil
+       }
+
+       if len(c.Endpoints) == 1 {
+               return c.Endpoints[0]
+       }
+
+       loadBalancer, ok := loadbalancer.LoadBalancerStrategy[c.LbStr]
+       if ok {
+               return loadBalancer.Handler(c)
+       }
+       return 
loadbalancer.LoadBalancerStrategy[model.LoadBalancerRand].Handler(c)
+}
+
 func (cm *ClusterManager) RemoveCluster(namesToDel []string) {
        cm.rw.Lock()
        defer cm.rw.Unlock()
@@ -194,7 +210,7 @@ func (s *ClusterStore) SetEndpoint(clusterName string, 
endpoint *model.Endpoint)
        }
 
        // cluster create
-       c := &model.Cluster{Name: clusterName, Lb: model.RoundRobin, Endpoints: 
[]*model.Endpoint{endpoint}}
+       c := &model.Cluster{Name: clusterName, LbStr: 
model.LoadBalancerRoundRobin, Endpoints: []*model.Endpoint{endpoint}}
        // not call AddCluster, because lock is not reenter
        s.Config = append(s.Config, c)
 }
diff --git a/samples/dubbogo/simple/triple/docker/docker-health-check.sh 
b/samples/dubbogo/simple/triple/docker/docker-health-check.sh
index 6e20e90..6345760 100644
--- a/samples/dubbogo/simple/triple/docker/docker-health-check.sh
+++ b/samples/dubbogo/simple/triple/docker/docker-health-check.sh
@@ -15,6 +15,5 @@
 #  limitations under the License.
 #
 
-sleep 5
+sleep 30
 curl http://127.0.0.1:8848/nacos/v1/console/health/liveness
-sleep 10
\ No newline at end of file

Reply via email to