This is an automated email from the ASF dual-hosted git repository.
ztelur pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/develop by this push:
new a27282e feat:load balancer strategy mode and add consistent hash
(#373)
a27282e is described below
commit a27282efe858dd557af94f4eef4b72329839dd01
Author: baerwang <[email protected]>
AuthorDate: Fri Mar 25 16:15:29 2022 +0800
feat:load balancer strategy mode and add consistent hash (#373)
* feat:load balancer strategy mode and add consistent hash
* fix triple samples ci
* fix:go fmt
* style:use gost library consistent hash
* style:temporary delete consistent hash wait etcd new version upgrade add
it back
Co-authored-by: mark4z <[email protected]>
---
pkg/adapter/springcloud/cloud.go | 3 +-
.../zookeeper/application_listener.go | 2 +-
.../servicediscovery/zookeeper/service_listener.go | 4 +-
pkg/adapter/xds/cds.go | 9 ++-
.../loadbalancer/load_balancer.go} | 28 ++++-----
.../loadbalancer/rand/load_balancer_rand.go} | 27 ++++-----
.../loadbalancer/roundrobin/round_robin.go} | 31 +++++-----
pkg/config/config_load.go | 19 +-----
pkg/model/cluster.go | 70 ++++++----------------
pkg/model/lb.go | 23 +++----
pkg/server/cluster_manager.go | 22 ++++++-
.../simple/triple/docker/docker-health-check.sh | 3 +-
12 files changed, 100 insertions(+), 141 deletions(-)
diff --git a/pkg/adapter/springcloud/cloud.go b/pkg/adapter/springcloud/cloud.go
index 52860b3..1e72fe6 100644
--- a/pkg/adapter/springcloud/cloud.go
+++ b/pkg/adapter/springcloud/cloud.go
@@ -293,8 +293,7 @@ func (a *CloudAdapter) fetchCompareAndSet() {
// first remove the router for removed cluster
for _, c := range oldStore.Config {
if !newStore.HasCluster(c.Name) {
- delete := &model.Router{ID: c.Name}
- rm.DeleteRouter(delete)
+ rm.DeleteRouter(&model.Router{ID: c.Name})
}
}
// second set cluster
diff --git
a/pkg/adapter/springcloud/servicediscovery/zookeeper/application_listener.go
b/pkg/adapter/springcloud/servicediscovery/zookeeper/application_listener.go
index 4d89d0b..d0d16dc 100644
--- a/pkg/adapter/springcloud/servicediscovery/zookeeper/application_listener.go
+++ b/pkg/adapter/springcloud/servicediscovery/zookeeper/application_listener.go
@@ -79,7 +79,7 @@ func (z *zkAppListener) watch() {
if err != nil {
failTimes++
logger.Infof("watching (path{%s}) = error{%v}",
z.servicesPath, err)
- if err == gzk.ErrNilNode {
+ if err == zk.ErrNoNode {
logger.Errorf("watching (path{%s}) got
errNilNode,so exit listen", z.servicesPath)
return
}
diff --git
a/pkg/adapter/springcloud/servicediscovery/zookeeper/service_listener.go
b/pkg/adapter/springcloud/servicediscovery/zookeeper/service_listener.go
index 086f3be..a454e8f 100644
--- a/pkg/adapter/springcloud/servicediscovery/zookeeper/service_listener.go
+++ b/pkg/adapter/springcloud/servicediscovery/zookeeper/service_listener.go
@@ -63,10 +63,10 @@ func (asl *applicationServiceListener) WatchAndHandle() {
if err != nil {
failTimes++
logger.Infof("watching (path{%s}) = error{%v}",
asl.servicePath, err)
- if err == gzk.ErrNilChildren {
+ if err == zk.ErrNoChildrenForEphemerals {
return
}
- if err == gzk.ErrNilNode {
+ if err == zk.ErrNoNode {
logger.Errorf("watching (path{%s}) got
errNilNode,so exit listen", asl.servicePath)
return
}
diff --git a/pkg/adapter/xds/cds.go b/pkg/adapter/xds/cds.go
index ff97ecc..dade74e 100644
--- a/pkg/adapter/xds/cds.go
+++ b/pkg/adapter/xds/cds.go
@@ -141,19 +141,18 @@ func (c *CdsManager) makeCluster(cluster *xdspb.Cluster)
*model.Cluster {
TypeStr: cluster.TypeStr,
Type: c.makeClusterType(cluster),
EdsClusterConfig:
c.makeEdsClusterConfig(cluster.EdsClusterConfig),
- LbStr: cluster.LbStr,
- Lb: c.makeLoadBalancePolicy(cluster),
+ LbStr: c.makeLoadBalancePolicy(cluster.LbStr),
HealthChecks: c.makeHealthChecks(cluster.HealthChecks),
Endpoints: c.makeEndpoints(cluster.Endpoints),
}
}
-func (c *CdsManager) makeLoadBalancePolicy(cluster *xdspb.Cluster)
model.LbPolicy {
- return model.LbPolicy(model.LbPolicyValue[cluster.LbStr])
+func (c *CdsManager) makeLoadBalancePolicy(lb string) model.LbPolicyType {
+ return model.LbPolicyTypeValue[lb]
}
func (c *CdsManager) makeClusterType(cluster *xdspb.Cluster)
model.DiscoveryType {
- return model.DiscoveryType(model.DiscoveryTypeValue[cluster.TypeStr])
+ return model.DiscoveryTypeValue[cluster.TypeStr]
}
func (c *CdsManager) makeEndpoints(endpoint *xdspb.Endpoint) []*model.Endpoint
{
diff --git a/pkg/model/lb.go b/pkg/cluster/loadbalancer/load_balancer.go
similarity index 63%
copy from pkg/model/lb.go
copy to pkg/cluster/loadbalancer/load_balancer.go
index 028cee5..fb2e3e3 100644
--- a/pkg/model/lb.go
+++ b/pkg/cluster/loadbalancer/load_balancer.go
@@ -15,24 +15,22 @@
* limitations under the License.
*/
-package model
+package loadbalancer
-// LbPolicy the load balance policy enum
-type LbPolicy int32
-
-const (
- RoundRobin LbPolicy = 0
- Rand LbPolicy = 3
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/model"
)
-// LbPolicyName key int32 for LbPolicy, value string
-var LbPolicyName = map[int32]string{
- 0: "RoundRobin",
- 3: "Rand",
+type LoadBalancer interface {
+ Handler(c *model.Cluster) *model.Endpoint
}
-// LbPolicyValue key string, value int32 for LbPolicy
-var LbPolicyValue = map[string]int32{
- "RoundRobin": 0,
- "Rand": 3,
+// LoadBalancerStrategy load balancer strategy mode
+var LoadBalancerStrategy = map[model.LbPolicyType]LoadBalancer{}
+
+func RegisterLoadBalancer(name model.LbPolicyType, balancer LoadBalancer) {
+ if _, ok := LoadBalancerStrategy[name]; ok {
+ panic("load balancer register fail " + name)
+ }
+ LoadBalancerStrategy[name] = balancer
}
diff --git a/pkg/model/lb.go
b/pkg/cluster/loadbalancer/rand/load_balancer_rand.go
similarity index 68%
copy from pkg/model/lb.go
copy to pkg/cluster/loadbalancer/rand/load_balancer_rand.go
index 028cee5..1c7899c 100644
--- a/pkg/model/lb.go
+++ b/pkg/cluster/loadbalancer/rand/load_balancer_rand.go
@@ -15,24 +15,23 @@
* limitations under the License.
*/
-package model
+package rand
-// LbPolicy the load balance policy enum
-type LbPolicy int32
+import (
+ "math/rand"
+)
-const (
- RoundRobin LbPolicy = 0
- Rand LbPolicy = 3
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/cluster/loadbalancer"
+ "github.com/apache/dubbo-go-pixiu/pkg/model"
)
-// LbPolicyName key int32 for LbPolicy, value string
-var LbPolicyName = map[int32]string{
- 0: "RoundRobin",
- 3: "Rand",
+func init() {
+ loadbalancer.RegisterLoadBalancer(model.LoadBalancerRand, Rand{})
}
-// LbPolicyValue key string, value int32 for LbPolicy
-var LbPolicyValue = map[string]int32{
- "RoundRobin": 0,
- "Rand": 3,
+type Rand struct{}
+
+func (Rand) Handler(c *model.Cluster) *model.Endpoint {
+ return c.Endpoints[rand.Intn(len(c.Endpoints))]
}
diff --git a/pkg/model/lb.go
b/pkg/cluster/loadbalancer/roundrobin/round_robin.go
similarity index 60%
copy from pkg/model/lb.go
copy to pkg/cluster/loadbalancer/roundrobin/round_robin.go
index 028cee5..138b62f 100644
--- a/pkg/model/lb.go
+++ b/pkg/cluster/loadbalancer/roundrobin/round_robin.go
@@ -15,24 +15,25 @@
* limitations under the License.
*/
-package model
+package roundrobin
-// LbPolicy the load balance policy enum
-type LbPolicy int32
-
-const (
- RoundRobin LbPolicy = 0
- Rand LbPolicy = 3
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/cluster/loadbalancer"
+ "github.com/apache/dubbo-go-pixiu/pkg/model"
)
-// LbPolicyName key int32 for LbPolicy, value string
-var LbPolicyName = map[int32]string{
- 0: "RoundRobin",
- 3: "Rand",
+func init() {
+ loadbalancer.RegisterLoadBalancer(model.LoadBalancerRoundRobin,
RoundRobin{})
}
-// LbPolicyValue key string, value int32 for LbPolicy
-var LbPolicyValue = map[string]int32{
- "RoundRobin": 0,
- "Rand": 3,
+type RoundRobin struct{}
+
+func (RoundRobin) Handler(c *model.Cluster) *model.Endpoint {
+ lens := len(c.Endpoints)
+ if c.PrePickEndpointIndex >= lens {
+ c.PrePickEndpointIndex = 0
+ }
+ e := c.Endpoints[c.PrePickEndpointIndex]
+ c.PrePickEndpointIndex = (c.PrePickEndpointIndex + 1) % lens
+ return e
}
diff --git a/pkg/config/config_load.go b/pkg/config/config_load.go
index 870e1be..d0b5efb 100644
--- a/pkg/config/config_load.go
+++ b/pkg/config/config_load.go
@@ -150,21 +150,6 @@ func GetLoadBalance(cfg *model.Bootstrap) (err error) {
logger.Error("Bootstrap configuration is null")
return err
}
- var lbPolicy int32
- for _, c := range cfg.StaticResources.Clusters {
- flag := true
- if c.TypeStr != "" {
- if t, ok := model.LbPolicyValue[c.LbStr]; ok {
- lbPolicy = t
- flag = false
- }
- }
- if flag {
- c.LbStr = constant.DefaultLoadBalanceType
- lbPolicy = model.LbPolicyValue[c.LbStr]
- }
- c.Lb = model.LbPolicy(lbPolicy)
- }
return nil
}
@@ -173,7 +158,7 @@ func GetDiscoveryType(cfg *model.Bootstrap) (err error) {
logger.Error("Bootstrap configuration is null")
return err
}
- var discoveryType int32
+ var discoveryType model.DiscoveryType
for _, c := range cfg.StaticResources.Clusters {
flag := true
if c.TypeStr != "" {
@@ -186,7 +171,7 @@ func GetDiscoveryType(cfg *model.Bootstrap) (err error) {
c.TypeStr = constant.DefaultDiscoveryType
discoveryType = model.DiscoveryTypeValue[c.TypeStr]
}
- c.Type = model.DiscoveryType(discoveryType)
+ c.Type = discoveryType
}
return nil
}
diff --git a/pkg/model/cluster.go b/pkg/model/cluster.go
index c18580c..fdf4f22 100644
--- a/pkg/model/cluster.go
+++ b/pkg/model/cluster.go
@@ -17,12 +17,8 @@
package model
-import (
- "math/rand"
-)
-
const (
- Static DiscoveryType = 0 + iota
+ Static DiscoveryType = iota
StrictDNS
LogicalDns
EDS
@@ -31,21 +27,21 @@ const (
var (
// DiscoveryTypeName
- DiscoveryTypeName = map[int32]string{
- 0: "Static",
- 1: "StrictDNS",
- 2: "LogicalDns",
- 3: "EDS",
- 4: "OriginalDst",
+ DiscoveryTypeName = map[DiscoveryType]string{
+ Static: "Static",
+ StrictDNS: "StrictDNS",
+ LogicalDns: "LogicalDns",
+ EDS: "EDS",
+ OriginalDst: "OriginalDst",
}
// DiscoveryTypeValue
- DiscoveryTypeValue = map[string]int32{
- "Static": 0,
- "StrictDNS": 1,
- "LogicalDns": 2,
- "EDS": 3,
- "OriginalDst": 4,
+ DiscoveryTypeValue = map[string]DiscoveryType{
+ "Static": Static,
+ "StrictDNS": StrictDNS,
+ "LogicalDns": LogicalDns,
+ "EDS": EDS,
+ "OriginalDst": OriginalDst,
}
)
@@ -56,11 +52,10 @@ type (
TypeStr string `yaml:"type" json:"type"`
// Type the cluster discovery type string value
Type DiscoveryType `yaml:"-" json:"-"`
// Type the cluster discovery type
EdsClusterConfig EdsClusterConfig
`yaml:"eds_cluster_config" json:"eds_cluster_config"
mapstructure:"eds_cluster_config"`
- LbStr string `yaml:"lb_policy"
json:"lb_policy"` // Lb the cluster select node used loadBalance policy
- Lb LbPolicy `yaml:",omitempty"
json:",omitempty"` // Lb the cluster select node used loadBalance policy
+ LbStr LbPolicyType `yaml:"lb_policy"
json:"lb_policy"` // Lb the cluster select node used loadBalance policy
HealthChecks []HealthCheck `yaml:"health_checks"
json:"health_checks"`
Endpoints []*Endpoint `yaml:"endpoints"
json:"endpoints"`
- prePickEndpointIndex int
+ PrePickEndpointIndex int
}
// EdsClusterConfig todo remove un-used EdsClusterConfig
@@ -83,36 +78,9 @@ type (
// Endpoint
Endpoint struct {
- ID string `yaml:"ID" json:"ID"` // ID indicate
one endpoint
- Name string `yaml:"name" json:"name"` // Name the
cluster unique name
- Address SocketAddress `yaml:"socket_address"
json:"socket_address" mapstructure:"socket_address"`
- // extra info such as label or other meta data
- Metadata map[string]string `yaml:"meta" json:"meta"`
+ ID string `yaml:"ID" json:"ID"`
// ID indicate one endpoint
+ Name string `yaml:"name" json:"name"`
// Name the cluster unique name
+ Address SocketAddress `yaml:"socket_address"
json:"socket_address" mapstructure:"socket_address"` // Address socket address
+ Metadata map[string]string `yaml:"meta" json:"meta"`
// Metadata extra info such as label or
other meta data
}
)
-
-func (c *Cluster) PickOneEndpoint() *Endpoint {
- // TODO: add lb strategy abstraction
- if c.Endpoints == nil || len(c.Endpoints) == 0 {
- return nil
- }
-
- if len(c.Endpoints) == 1 {
- return c.Endpoints[0]
- }
-
- if c.Lb == Rand {
- return c.Endpoints[rand.Intn(len(c.Endpoints))]
- } else if c.Lb == RoundRobin {
-
- lens := len(c.Endpoints)
- if c.prePickEndpointIndex >= lens {
- c.prePickEndpointIndex = 0
- }
- e := c.Endpoints[c.prePickEndpointIndex]
- c.prePickEndpointIndex = (c.prePickEndpointIndex + 1) % lens
- return e
- } else {
- return c.Endpoints[rand.Intn(len(c.Endpoints))]
- }
-}
diff --git a/pkg/model/lb.go b/pkg/model/lb.go
index 028cee5..32a4f9d 100644
--- a/pkg/model/lb.go
+++ b/pkg/model/lb.go
@@ -17,22 +17,17 @@
package model
-// LbPolicy the load balance policy enum
-type LbPolicy int32
+// LbPolicyType the load balance policy enum
+type LbPolicyType string
const (
- RoundRobin LbPolicy = 0
- Rand LbPolicy = 3
+ LoadBalancerRand LbPolicyType = "Rand"
+ LoadBalancerRoundRobin LbPolicyType = "RoundRobin"
+ LoadBalanceConsistentHashing LbPolicyType = "ConsistentHashing"
)
-// LbPolicyName key int32 for LbPolicy, value string
-var LbPolicyName = map[int32]string{
- 0: "RoundRobin",
- 3: "Rand",
-}
-
-// LbPolicyValue key string, value int32 for LbPolicy
-var LbPolicyValue = map[string]int32{
- "RoundRobin": 0,
- "Rand": 3,
+var LbPolicyTypeValue = map[string]LbPolicyType{
+ "Rand": LoadBalancerRand,
+ "RoundRobin": LoadBalancerRoundRobin,
+ "ConsistentHashing": LoadBalanceConsistentHashing,
}
diff --git a/pkg/server/cluster_manager.go b/pkg/server/cluster_manager.go
index f659d94..9933ff5 100644
--- a/pkg/server/cluster_manager.go
+++ b/pkg/server/cluster_manager.go
@@ -23,6 +23,7 @@ import (
)
import (
+ "github.com/apache/dubbo-go-pixiu/pkg/cluster/loadbalancer"
"github.com/apache/dubbo-go-pixiu/pkg/common/yaml"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pkg/model"
@@ -120,13 +121,28 @@ func (cm *ClusterManager) PickEndpoint(clusterName
string) *model.Endpoint {
for _, cluster := range cm.store.Config {
if cluster.Name == clusterName {
- // according to lb to choose one endpoint, now only
random
- return cluster.PickOneEndpoint()
+ return pickOneEndpoint(cluster)
}
}
return nil
}
+func pickOneEndpoint(c *model.Cluster) *model.Endpoint {
+ if c.Endpoints == nil || len(c.Endpoints) == 0 {
+ return nil
+ }
+
+ if len(c.Endpoints) == 1 {
+ return c.Endpoints[0]
+ }
+
+ loadBalancer, ok := loadbalancer.LoadBalancerStrategy[c.LbStr]
+ if ok {
+ return loadBalancer.Handler(c)
+ }
+ return
loadbalancer.LoadBalancerStrategy[model.LoadBalancerRand].Handler(c)
+}
+
func (cm *ClusterManager) RemoveCluster(namesToDel []string) {
cm.rw.Lock()
defer cm.rw.Unlock()
@@ -194,7 +210,7 @@ func (s *ClusterStore) SetEndpoint(clusterName string,
endpoint *model.Endpoint)
}
// cluster create
- c := &model.Cluster{Name: clusterName, Lb: model.RoundRobin, Endpoints:
[]*model.Endpoint{endpoint}}
+ c := &model.Cluster{Name: clusterName, LbStr:
model.LoadBalancerRoundRobin, Endpoints: []*model.Endpoint{endpoint}}
// not call AddCluster, because lock is not reenter
s.Config = append(s.Config, c)
}
diff --git a/samples/dubbogo/simple/triple/docker/docker-health-check.sh
b/samples/dubbogo/simple/triple/docker/docker-health-check.sh
index 6e20e90..6345760 100644
--- a/samples/dubbogo/simple/triple/docker/docker-health-check.sh
+++ b/samples/dubbogo/simple/triple/docker/docker-health-check.sh
@@ -15,6 +15,5 @@
# limitations under the License.
#
-sleep 5
+sleep 30
curl http://127.0.0.1:8848/nacos/v1/console/health/liveness
-sleep 10
\ No newline at end of file