This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/develop by this push:
new 8a01f686 Add Maglev hashing LB algorithm (#554)
8a01f686 is described below
commit 8a01f686cf3f6e261580793c6807dc014a637638
Author: sh2 <[email protected]>
AuthorDate: Tue May 16 15:10:54 2023 +0800
Add Maglev hashing LB algorithm (#554)
* Add load balancing algorithm Maglev
* update consistent hash load balancing to a more generic type
* outsource consistent hash init function
* add support for maglev hash
* add test for maglev hash
* finish maglev hash lb algorithm
* fix import format
* fix review
* improve consistency for maglev hash
* add maglev and ring hash to plugin registry, fix a bug in rand lb
* add default table size option for maglev hash and using random hash func
* withdraw rand range and fix test
---
pixiu/pkg/cluster/loadbalancer/load_balancer.go | 7 +
.../consistent_hash.go => maglev/maglev_hash.go} | 40 ++-
.../maglev_hash_test.go} | 14 +-
.../pkg/cluster/loadbalancer/maglev/permutation.go | 288 +++++++++++++++++++++
.../loadbalancer/maglev/permutation_test.go | 278 ++++++++++++++++++++
.../consistent_hash.go => ringhash/ring_hash.go} | 43 ++-
.../ring_hash_test.go} | 12 +-
pixiu/pkg/model/cluster.go | 44 +---
pixiu/pkg/model/lb.go | 28 +-
pixiu/pkg/pluginregistry/registry.go | 2 +
pixiu/pkg/server/cluster_manager.go | 10 +-
11 files changed, 695 insertions(+), 71 deletions(-)
diff --git a/pixiu/pkg/cluster/loadbalancer/load_balancer.go
b/pixiu/pkg/cluster/loadbalancer/load_balancer.go
index 969a1175..3c40f759 100644
--- a/pixiu/pkg/cluster/loadbalancer/load_balancer.go
+++ b/pixiu/pkg/cluster/loadbalancer/load_balancer.go
@@ -34,3 +34,10 @@ func RegisterLoadBalancer(name model.LbPolicyType, balancer
LoadBalancer) {
}
LoadBalancerStrategy[name] = balancer
}
+
+func RegisterConsistentHashInit(name model.LbPolicyType, function
model.ConsistentHashInitFunc) {
+ if _, ok := model.ConsistentHashInitMap[name]; ok {
+ panic("consistent hash load balancer register fail " + name)
+ }
+ model.ConsistentHashInitMap[name] = function
+}
diff --git a/pixiu/pkg/cluster/loadbalancer/hash/consistent_hash.go
b/pixiu/pkg/cluster/loadbalancer/maglev/maglev_hash.go
similarity index 50%
copy from pixiu/pkg/cluster/loadbalancer/hash/consistent_hash.go
copy to pixiu/pkg/cluster/loadbalancer/maglev/maglev_hash.go
index f56d6275..72d7b10f 100644
--- a/pixiu/pkg/cluster/loadbalancer/hash/consistent_hash.go
+++ b/pixiu/pkg/cluster/loadbalancer/maglev/maglev_hash.go
@@ -15,36 +15,52 @@
* limitations under the License.
*/
-package consistent
-
-import (
- "fmt"
-)
+package maglev
import (
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/cluster/loadbalancer"
+
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/cluster/loadbalancer/ringhash"
+ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
)
func init() {
- loadbalancer.RegisterLoadBalancer(model.LoadBalanceConsistentHashing,
ConsistentHashing{})
+ loadbalancer.RegisterLoadBalancer(model.LoadBalancerMaglevHashing,
MaglevHash{})
+
loadbalancer.RegisterConsistentHashInit(model.LoadBalancerMaglevHashing,
NewMaglevHash)
}
-type ConsistentHashing struct{}
+func NewMaglevHash(config model.ConsistentHash, endpoints []*model.Endpoint)
model.LbConsistentHash {
+ hosts := make([]string, len(endpoints))
+ for i, endpoint := range endpoints {
+ hosts[i] = endpoint.GetHost()
+ }
+
+ h, err := NewLookUpTable(config.MaglevTableSize, hosts)
+ if err == nil {
+ h.Populate()
+ return h
+ }
+
+ logger.Infof("[dubbo-go-pixiu] maglev hash load balancing fail: %v,
using ring hash instead", err)
+ if config.ReplicaNum == 0 {
+ config.ReplicaNum = 2 * len(endpoints)
+ }
+ return ringhash.NewRingHash(config, endpoints)
+}
-func (ConsistentHashing) Handler(c *model.ClusterConfig, policy
model.LbPolicy) *model.Endpoint {
- u := c.Hash.ConsistentHash.Hash(policy.GenerateHash())
+type MaglevHash struct{}
- hash, err := c.Hash.ConsistentHash.GetHash(u)
+func (m MaglevHash) Handler(c *model.ClusterConfig, policy model.LbPolicy)
*model.Endpoint {
+ dst, err := c.ConsistentHash.Hash.Get(policy.GenerateHash())
if err != nil {
+ logger.Warnf("[dubbo-go-pixiu] error of getting from maglev
hash: %v", err)
return nil
}
endpoints := c.GetEndpoint(true)
for _, endpoint := range endpoints {
- address := endpoint.Address
- if fmt.Sprintf("%s:%d", address.Address, address.Port) == hash {
+ if endpoint.GetHost() == dst {
return endpoint
}
}
diff --git a/pixiu/pkg/cluster/loadbalancer/hash/consistent_hash_test.go
b/pixiu/pkg/cluster/loadbalancer/maglev/maglev_hash_test.go
similarity index 84%
copy from pixiu/pkg/cluster/loadbalancer/hash/consistent_hash_test.go
copy to pixiu/pkg/cluster/loadbalancer/maglev/maglev_hash_test.go
index 158a895f..a28124ff 100644
--- a/pixiu/pkg/cluster/loadbalancer/hash/consistent_hash_test.go
+++ b/pixiu/pkg/cluster/loadbalancer/maglev/maglev_hash_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package consistent
+package maglev
import (
"fmt"
@@ -29,7 +29,7 @@ import (
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
)
-func TestHashRing(t *testing.T) {
+func TestMaglevHash(t *testing.T) {
nodeCount := 5
@@ -41,12 +41,16 @@ func TestHashRing(t *testing.T) {
Address: model.SocketAddress{Address: "192.168.1." +
name, Port: 1000 + i}})
}
- cluster := &model.ClusterConfig{Name: "cluster1", Endpoints: nodes,
- LbStr: model.LoadBalanceConsistentHashing, Hash:
model.Hash{ReplicaNum: 10, MaxVnodeNum: 1023}}
+ cluster := &model.ClusterConfig{
+ Name: "test-cluster",
+ Endpoints: nodes,
+ LbStr: model.LoadBalancerMaglevHashing,
+ ConsistentHash: model.ConsistentHash{MaglevTableSize: 521},
+ }
cluster.CreateConsistentHash()
var (
- hashing = ConsistentHashing{}
+ hashing = MaglevHash{}
path string
)
diff --git a/pixiu/pkg/cluster/loadbalancer/maglev/permutation.go
b/pixiu/pkg/cluster/loadbalancer/maglev/permutation.go
new file mode 100644
index 00000000..e65de6e4
--- /dev/null
+++ b/pixiu/pkg/cluster/loadbalancer/maglev/permutation.go
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package maglev
+
+import (
+ "encoding/binary"
+ "hash/maphash"
+ "math"
+ "math/big"
+ "sync"
+)
+
+import (
+ "github.com/pkg/errors"
+ "golang.org/x/crypto/blake2b"
+ "golang.org/x/crypto/sha3"
+)
+
+// Default table size support maximum 10,000 endpoints
+var defaultTableSize = []int{
+ 307, 503, 1511, 2503, 5099, 7877,
+ 10007, 14207, 20903, 40423, 88721,
+ 123911, 164821, 199999, 218077, 299777,
+ 344941, 399989, 463031, 514399, 604613,
+ 686669, 726337, 799789, 857011, 999983,
+}
+
+type permutation struct {
+ pos []uint32
+ next int
+ index int
+ hit int
+}
+
+type LookUpTable struct {
+ slots []string
+ permutations []*permutation
+ buckets map[int]string
+ endpointNum int
+ size int
+ sync.RWMutex
+}
+
+func NewLookUpTable(tableSize int, hosts []string) (*LookUpTable, error) {
+ expectN := len(hosts) * 100
+ if tableSize == 0 {
+ // find closet table size
+ for _, size := range defaultTableSize {
+ if expectN < size {
+ tableSize = size
+ break
+ }
+ }
+ }
+
+ if tableSize < expectN &&
!big.NewInt(0).SetUint64(uint64(tableSize)).ProbablyPrime(1) {
+ return nil, errors.Errorf("table size should be a prime number
that greater than at least "+
+ "100 times of endpoints size, but got %d instead",
tableSize)
+ }
+
+ buckets := make(map[int]string, len(hosts))
+ for i, host := range hosts {
+ buckets[i] = host
+ }
+ n := len(buckets)
+
+ return &LookUpTable{
+ buckets: buckets,
+ endpointNum: n,
+ size: tableSize,
+ }, nil
+}
+
+// Populate Magelev hashing look up table.
+func (t *LookUpTable) Populate() {
+ t.Lock()
+ defer t.Unlock()
+
+ t.generatePerms()
+ t.populate()
+}
+
+func (t *LookUpTable) populate() {
+ t.slots = make([]string, t.size)
+
+ full, miss := 0, 0
+ for miss < t.endpointNum && full < t.size {
+ for _, p := range t.permutations {
+ if p.next == t.size {
+ continue
+ }
+ start := p.next
+ for start < t.size && len(t.slots[p.pos[start]]) > 0 {
+ start++
+ }
+ if start < t.size {
+ t.slots[p.pos[start]] = t.buckets[p.index]
+ p.hit++
+ full++
+ } else {
+ miss++
+ }
+ p.next = start
+ }
+ }
+
+ // Fill the empty slots with the least placed Endpoint.
+ if full != t.size && miss > 0 {
+ t.fillMissingSlots()
+ }
+}
+
+func (t *LookUpTable) fillMissingSlots() {
+ var minP *permutation
+ minHit := math.MaxInt
+ for _, p := range t.permutations {
+ if p.hit < minHit {
+ minHit = p.hit
+ minP = p
+ }
+ }
+ for i, s := range t.slots {
+ if len(s) == 0 {
+ t.slots[i] = t.buckets[minP.index]
+ minP.hit++
+ }
+ }
+}
+
+func (t *LookUpTable) generatePerms() {
+ t.permutations = make([]*permutation, 0, t.endpointNum)
+
+ for i, b := range t.buckets {
+ t.generatePerm(b, i)
+ }
+}
+
+func (t *LookUpTable) generatePerm(bucket string, i int) {
+ var offs, skip, j uint32
+
+ m := uint32(t.size)
+ pos := make([]uint32, m)
+ offs = _hash1(bucket) % m
+ skip = _hash2(bucket)%(m-1) + 1
+ for j = 0; j < m; j++ {
+ pos[j] = (offs + j*skip) % m
+ }
+ t.permutations = append(t.permutations, &permutation{pos, 0, i, 0})
+}
+
+func (t *LookUpTable) resetPerms() {
+ for _, p := range t.permutations {
+ p.next = 0
+ p.hit = 0
+ }
+}
+
+func (t *LookUpTable) removePerm(dst int) {
+ del := -1
+ for i, p := range t.permutations {
+ if p.index == dst {
+ del = i
+ break
+ }
+ }
+ if del != -1 {
+ t.permutations[del] = nil
+ t.permutations = append(t.permutations[:del],
t.permutations[del+1:]...)
+ }
+}
+
+// Hash the input key.
+func (t *LookUpTable) Hash(key string) uint32 {
+ var h maphash.Hash
+ h.SetSeed(maphash.MakeSeed())
+ h.WriteString(key)
+ return uint32(h.Sum64())
+}
+
+// Get a slot by hashing the input key.
+func (t *LookUpTable) Get(key string) (string, error) {
+ t.RLock()
+ defer t.RUnlock()
+
+ if len(t.slots) == 0 {
+ return "", errors.New("no slot initialized, please call
Populate() before Get()")
+ }
+
+ if t.size == 0 || t.endpointNum == 0 {
+ return "", errors.New("no host added")
+ }
+
+ dst := t.Hash(key) % uint32(t.size)
+ return t.slots[dst], nil
+}
+
+// GetHash a slot by a hashed key.
+func (t *LookUpTable) GetHash(key uint32) (string, error) {
+ t.RLock()
+ defer t.RUnlock()
+
+ if len(t.slots) == 0 {
+ return "", errors.New("no slot initialized, please call
Populate() before Get()")
+ }
+
+ if t.size == 0 || t.endpointNum == 0 {
+ return "", errors.New("no host added")
+ }
+
+ return t.slots[key], nil
+}
+
+// Add one endpoint into lookup table.
+func (t *LookUpTable) Add(host string) {
+ t.Lock()
+ defer t.Unlock()
+
+ t.add(host)
+}
+
+func (t *LookUpTable) add(host string) {
+ dst := 0
+ for i, bucket := range t.buckets {
+ if bucket == host {
+ return
+ }
+ if i > dst {
+ dst = i
+ }
+ }
+ t.buckets[dst+1] = host
+ t.endpointNum++
+ t.resetPerms()
+ t.generatePerm(host, dst+1)
+ t.populate()
+}
+
+// Remove one endpoint from lookup table.
+func (t *LookUpTable) Remove(host string) bool {
+ t.Lock()
+ defer t.Unlock()
+
+ return t.remove(host)
+}
+
+func (t *LookUpTable) remove(host string) bool {
+ if t.endpointNum <= 0 {
+ return false
+ }
+
+ for i, bucket := range t.buckets {
+ if bucket == host {
+ delete(t.buckets, i)
+ t.endpointNum--
+ t.removePerm(i)
+ t.resetPerms()
+ t.populate()
+ return true
+ }
+ }
+
+ return false
+}
+
+func _hash1(key string) uint32 {
+ out := sha3.Sum512([]byte(key))
+ return binary.LittleEndian.Uint32(out[:])
+}
+
+func _hash2(key string) uint32 {
+ out := blake2b.Sum512([]byte(key))
+ return binary.LittleEndian.Uint32(out[:])
+}
diff --git a/pixiu/pkg/cluster/loadbalancer/maglev/permutation_test.go
b/pixiu/pkg/cluster/loadbalancer/maglev/permutation_test.go
new file mode 100644
index 00000000..4bda48f8
--- /dev/null
+++ b/pixiu/pkg/cluster/loadbalancer/maglev/permutation_test.go
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package maglev
+
+import (
+ "fmt"
+ "testing"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+)
+
+const ConsistencyToleration = 3
+
+func WithLookUpTable(tableSize, nodeCount int, function func(*LookUpTable)) {
+ table := createTableWithNodes(tableSize, nodeCount)
+ table.Populate()
+ if function != nil {
+ function(table)
+ }
+}
+
+func createTableWithNodes(tableSize, nodeCount int) *LookUpTable {
+ nodes := make([]string, 0, nodeCount)
+ for i := 1; i <= nodeCount; i++ {
+ nodes = append(nodes, createEndpoint(i))
+ }
+ table, _ := NewLookUpTable(tableSize, nodes)
+ return table
+}
+
+func createEndpoint(id int) string {
+ return fmt.Sprintf("192.168.1.%d:%d", id, 1000+id)
+}
+
+func TestCreateLookUpTableWithoutTableSize(t *testing.T) {
+ testCases := []struct {
+ nodeCount int
+ expectSize int
+ }{
+ {
+ nodeCount: 0,
+ expectSize: 307,
+ },
+ {
+ nodeCount: 1,
+ expectSize: 307,
+ },
+ {
+ nodeCount: 3,
+ expectSize: 307,
+ },
+ {
+ nodeCount: 5,
+ expectSize: 503,
+ },
+ {
+ nodeCount: 15,
+ expectSize: 1511,
+ },
+ {
+ nodeCount: 100001,
+ expectSize: 0,
+ },
+ }
+ for i, tc := range testCases {
+ table := createTableWithNodes(0, tc.nodeCount)
+ if i == 5 {
+ assert.Nil(t, table, "Got non nil table")
+ } else {
+ assert.Equal(t, tc.expectSize, table.size, "Wrong
default table size")
+ }
+ }
+}
+
+func TestLookUpTable_Populate(t *testing.T) {
+ testCases := []struct {
+ nodeCount int
+ size int
+ }{
+ {
+ nodeCount: 0,
+ size: 11,
+ },
+ {
+ nodeCount: 3,
+ size: 313,
+ },
+ {
+ nodeCount: 5,
+ size: 503,
+ },
+ {
+ nodeCount: 10,
+ size: 1013,
+ },
+ {
+ nodeCount: 50,
+ size: 5021,
+ },
+ {
+ nodeCount: 250,
+ size: 25447,
+ },
+ }
+
+ for _, tc := range testCases {
+ WithLookUpTable(tc.size, tc.nodeCount, func(table *LookUpTable)
{
+ checkUnPopulateSlot(t, table)
+ checkMissingEndpoint(t, table)
+ checkConsistency(t, table)
+ })
+ }
+}
+
+func TestLookUpTable_Get(t *testing.T) {
+ tableSize, nodeCount := 1033, 10
+ key := "/this/is/a/test"
+
+ table := createTableWithNodes(tableSize, nodeCount)
+
+ _, err := table.Get(key)
+ assert.NotNil(t, err, "Got endpoint before populating")
+
+ table.Populate()
+
+ ep, err := table.Get(key)
+ assert.Nil(t, err, "Fail to get endpoint")
+ assert.NotEqual(t, "", ep, "Wrong endpoint")
+}
+
+func TestLookUpTable_Add(t *testing.T) {
+ testCases := []struct {
+ nodeCount int
+ size int
+ add []int
+ }{
+ {
+ nodeCount: 0,
+ size: 11,
+ add: []int{1, 2, 3, 4, 5},
+ },
+ {
+ nodeCount: 3,
+ size: 313,
+ add: []int{4, 5, 6},
+ },
+ {
+ nodeCount: 50,
+ size: 5021,
+ add: []int{51, 52, 53, 54},
+ },
+ }
+
+ for _, tc := range testCases {
+ WithLookUpTable(tc.size, tc.nodeCount, func(table *LookUpTable)
{
+ for _, add := range tc.add {
+ ep := createEndpoint(add)
+ table.Add(ep)
+ checkUnPopulateSlot(t, table)
+ checkMissingEndpoint(t, table)
+ checkConsistency(t, table)
+ }
+ })
+ }
+}
+
+func TestLookUpTable_Remove(t *testing.T) {
+ testCases := []struct {
+ nodeCount int
+ size int
+ delete []int
+ wanted []bool
+ }{
+ {
+ nodeCount: 0,
+ size: 11,
+ delete: []int{2},
+ wanted: []bool{false},
+ },
+ {
+ nodeCount: 1,
+ size: 101,
+ delete: []int{2, 1, 2},
+ wanted: []bool{false, true, false},
+ },
+ {
+ nodeCount: 3,
+ size: 307,
+ delete: []int{1, 2, 3, 4},
+ wanted: []bool{true, true, true, false},
+ },
+ {
+ nodeCount: 10,
+ size: 1013,
+ delete: []int{4, 5, 6, 11},
+ wanted: []bool{true, true, true, false},
+ },
+ {
+ nodeCount: 50,
+ size: 5021,
+ delete: []int{54, 15, 1, 59, 5, 1},
+ wanted: []bool{false, true, true, false, true,
false},
+ },
+ }
+
+ for id, tc := range testCases {
+ WithLookUpTable(tc.size, tc.nodeCount, func(table *LookUpTable)
{
+ for i, del := range tc.delete {
+ ep := createEndpoint(del)
+ ret := table.Remove(ep)
+ assert.Equal(t, tc.wanted[i], ret,
+ "Wrong state removing: %s in case %d",
del, id)
+ checkUnPopulateSlot(t, table)
+ checkMissingEndpoint(t, table)
+ checkEndpointNotIn(t, table, ep)
+ checkConsistency(t, table)
+ }
+ })
+ }
+}
+
+func checkUnPopulateSlot(t *testing.T, table *LookUpTable) {
+ if table.endpointNum == 0 {
+ return
+ }
+
+ for i, s := range table.slots {
+ assert.NotEqual(t, "", s,
+ "Unpopulating slot in table(node=%d, factor=%d) at
slot: %d", table.endpointNum, table.size, i)
+ }
+}
+
+func checkMissingEndpoint(t *testing.T, table *LookUpTable) {
+ for _, p := range table.permutations {
+ assert.Equal(t, true, p.hit > 0,
+ "Missing endpoint: %s in table(node=%d, factor=%d)",
table.slots[p.index], table.endpointNum, table.size)
+ }
+}
+
+func checkEndpointNotIn(t *testing.T, table *LookUpTable, endpoint string) {
+ for i, slot := range table.slots {
+ assert.NotEqual(t, endpoint, slot,
+ "Unexpect endpoint %s in slot %d", endpoint, i)
+ }
+}
+
+func checkConsistency(t *testing.T, table *LookUpTable) {
+ if table.endpointNum == 0 {
+ return
+ }
+
+ avg := table.size / table.endpointNum
+ dist := make(map[string]int)
+ for _, slot := range table.slots {
+ dist[slot]++
+ }
+ for k, v := range dist {
+ assert.NotEqual(t, true, v > avg+ConsistencyToleration || v <
avg-ConsistencyToleration,
+ "%s with distributions %d not in %d +/- %d", k, v, avg,
ConsistencyToleration)
+ }
+}
diff --git a/pixiu/pkg/cluster/loadbalancer/hash/consistent_hash.go
b/pixiu/pkg/cluster/loadbalancer/ringhash/ring_hash.go
similarity index 50%
rename from pixiu/pkg/cluster/loadbalancer/hash/consistent_hash.go
rename to pixiu/pkg/cluster/loadbalancer/ringhash/ring_hash.go
index f56d6275..13b59ab7 100644
--- a/pixiu/pkg/cluster/loadbalancer/hash/consistent_hash.go
+++ b/pixiu/pkg/cluster/loadbalancer/ringhash/ring_hash.go
@@ -15,36 +15,61 @@
* limitations under the License.
*/
-package consistent
+package ringhash
import (
- "fmt"
+ "math"
+)
+
+import (
+ "github.com/dubbogo/gost/hash/consistent"
)
import (
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/cluster/loadbalancer"
+ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
)
func init() {
- loadbalancer.RegisterLoadBalancer(model.LoadBalanceConsistentHashing,
ConsistentHashing{})
+ loadbalancer.RegisterLoadBalancer(model.LoadBalancerRingHashing,
RingHashing{})
+ loadbalancer.RegisterConsistentHashInit(model.LoadBalancerRingHashing,
NewRingHash)
}
-type ConsistentHashing struct{}
+func NewRingHash(config model.ConsistentHash, endpoints []*model.Endpoint)
model.LbConsistentHash {
+ var ops []consistent.Option
+
+ if config.ReplicaNum != 0 {
+ ops = append(ops, consistent.WithReplicaNum(config.ReplicaNum))
+ }
+
+ if config.MaxVnodeNum != 0 {
+ ops = append(ops,
consistent.WithMaxVnodeNum(int(config.MaxVnodeNum)))
+ } else {
+ config.MaxVnodeNum = math.MinInt32
+ }
+
+ h := consistent.NewConsistentHash(ops...)
+ for _, endpoint := range endpoints {
+ h.Add(endpoint.GetHost())
+ }
+ return h
+}
-func (ConsistentHashing) Handler(c *model.ClusterConfig, policy
model.LbPolicy) *model.Endpoint {
- u := c.Hash.ConsistentHash.Hash(policy.GenerateHash())
+type RingHashing struct{}
- hash, err := c.Hash.ConsistentHash.GetHash(u)
+func (r RingHashing) Handler(c *model.ClusterConfig, policy model.LbPolicy)
*model.Endpoint {
+ u := c.ConsistentHash.Hash.Hash(policy.GenerateHash())
+ hash, err := c.ConsistentHash.Hash.GetHash(u)
if err != nil {
+ logger.Warnf("[dubbo-go-pixiu] error of getting from ring hash:
%v", err)
return nil
}
endpoints := c.GetEndpoint(true)
for _, endpoint := range endpoints {
- address := endpoint.Address
- if fmt.Sprintf("%s:%d", address.Address, address.Port) == hash {
+ if endpoint.GetHost() == hash {
return endpoint
}
}
diff --git a/pixiu/pkg/cluster/loadbalancer/hash/consistent_hash_test.go
b/pixiu/pkg/cluster/loadbalancer/ringhash/ring_hash_test.go
similarity index 85%
rename from pixiu/pkg/cluster/loadbalancer/hash/consistent_hash_test.go
rename to pixiu/pkg/cluster/loadbalancer/ringhash/ring_hash_test.go
index 158a895f..ea8ceded 100644
--- a/pixiu/pkg/cluster/loadbalancer/hash/consistent_hash_test.go
+++ b/pixiu/pkg/cluster/loadbalancer/ringhash/ring_hash_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package consistent
+package ringhash
import (
"fmt"
@@ -41,12 +41,16 @@ func TestHashRing(t *testing.T) {
Address: model.SocketAddress{Address: "192.168.1." +
name, Port: 1000 + i}})
}
- cluster := &model.ClusterConfig{Name: "cluster1", Endpoints: nodes,
- LbStr: model.LoadBalanceConsistentHashing, Hash:
model.Hash{ReplicaNum: 10, MaxVnodeNum: 1023}}
+ cluster := &model.ClusterConfig{
+ Name: "cluster1",
+ Endpoints: nodes,
+ LbStr: model.LoadBalancerRingHashing,
+ ConsistentHash: model.ConsistentHash{ReplicaNum: 10,
MaxVnodeNum: 1023},
+ }
cluster.CreateConsistentHash()
var (
- hashing = ConsistentHashing{}
+ hashing = RingHashing{}
path string
)
diff --git a/pixiu/pkg/model/cluster.go b/pixiu/pkg/model/cluster.go
index 6239c1c6..2040305e 100644
--- a/pixiu/pkg/model/cluster.go
+++ b/pixiu/pkg/model/cluster.go
@@ -19,11 +19,6 @@ package model
import (
"fmt"
- "math"
-)
-
-import (
- "github.com/dubbogo/gost/hash/consistent"
)
const (
@@ -62,7 +57,7 @@ type (
Type DiscoveryType `yaml:"-" json:"-"`
// Type the cluster discovery type
EdsClusterConfig EdsClusterConfig
`yaml:"eds_cluster_config" json:"eds_cluster_config"
mapstructure:"eds_cluster_config"`
LbStr LbPolicyType `yaml:"lb_policy"
json:"lb_policy"` // Lb the cluster select node used loadBalance policy
- Hash Hash `yaml:"consistent"
json:"consistent"` // Consistent hash config info
+ ConsistentHash ConsistentHash `yaml:"consistent"
json:"consistent"` // Consistent hash config info
HealthChecks []HealthCheckConfig `yaml:"health_checks"
json:"health_checks"`
Endpoints []*Endpoint `yaml:"endpoints"
json:"endpoints"`
PrePickEndpointIndex int
@@ -100,10 +95,12 @@ type (
UnHealthy bool
}
- Hash struct {
- ReplicaNum int `yaml:"replica_num" json:"replica_num"`
- MaxVnodeNum int32 `yaml:"max_vnode_num" json:"max_vnode_num"`
- ConsistentHash *consistent.Consistent
+ // ConsistentHash methods include: RingHash, MaglevHash
+ ConsistentHash struct {
+ ReplicaNum int `yaml:"replica_num" json:"replica_num"`
+ MaxVnodeNum int32 `yaml:"max_vnode_num"
json:"max_vnode_num"`
+ MaglevTableSize int `yaml:"maglev_table_size"
json:"maglev_table_size"`
+ Hash LbConsistentHash
}
)
@@ -118,26 +115,13 @@ func (c *ClusterConfig) GetEndpoint(mustHealth bool)
[]*Endpoint {
return endpoints
}
+// CreateConsistentHash creates consistent hashing algorithms for Load Balance.
func (c *ClusterConfig) CreateConsistentHash() {
- if c.LbStr == LoadBalanceConsistentHashing {
- var ops []consistent.Option
-
- if c.Hash.ReplicaNum != 0 {
- ops = append(ops,
consistent.WithReplicaNum(c.Hash.ReplicaNum))
- }
-
- if c.Hash.MaxVnodeNum != 0 {
- ops = append(ops,
consistent.WithMaxVnodeNum(int(c.Hash.MaxVnodeNum)))
- } else {
- c.Hash.MaxVnodeNum = math.MinInt32
- }
-
- h := consistent.NewConsistentHash(ops...)
-
- for _, endpoint := range c.Endpoints {
- address := endpoint.Address
- h.Add(fmt.Sprintf("%s:%d", address.Address,
address.Port))
- }
- c.Hash.ConsistentHash = h
+ if newConsistentHash, ok := ConsistentHashInitMap[c.LbStr]; ok {
+ c.ConsistentHash.Hash = newConsistentHash(c.ConsistentHash,
c.Endpoints)
}
}
+
+func (e Endpoint) GetHost() string {
+ return fmt.Sprintf("%s:%d", e.Address.Address, e.Address.Port)
+}
diff --git a/pixiu/pkg/model/lb.go b/pixiu/pkg/model/lb.go
index f41b03f2..906fb086 100644
--- a/pixiu/pkg/model/lb.go
+++ b/pixiu/pkg/model/lb.go
@@ -21,17 +21,33 @@ package model
type LbPolicyType string
const (
- LoadBalancerRand LbPolicyType = "Rand"
- LoadBalancerRoundRobin LbPolicyType = "RoundRobin"
- LoadBalanceConsistentHashing LbPolicyType = "ConsistentHashing"
+ LoadBalancerRand LbPolicyType = "Rand"
+ LoadBalancerRoundRobin LbPolicyType = "RoundRobin"
+ LoadBalancerRingHashing LbPolicyType = "RingHashing"
+ LoadBalancerMaglevHashing LbPolicyType = "MaglevHashing"
)
var LbPolicyTypeValue = map[string]LbPolicyType{
- "Rand": LoadBalancerRand,
- "RoundRobin": LoadBalancerRoundRobin,
- "ConsistentHashing": LoadBalanceConsistentHashing,
+ "Rand": LoadBalancerRand,
+ "RoundRobin": LoadBalancerRoundRobin,
+ "RingHashing": LoadBalancerRingHashing,
+ "MaglevHashing": LoadBalancerMaglevHashing,
}
type LbPolicy interface {
GenerateHash() string
}
+
+// LbConsistentHash supports consistent hash load balancing
+type LbConsistentHash interface {
+ Hash(key string) uint32
+ Add(host string)
+ Get(key string) (string, error)
+ GetHash(key uint32) (string, error)
+ Remove(host string) bool
+}
+
+type ConsistentHashInitFunc = func(ConsistentHash, []*Endpoint)
LbConsistentHash
+
+// ConsistentHashInitMap stores the Init functions for consistent hash load
balancing
+var ConsistentHashInitMap = map[LbPolicyType]ConsistentHashInitFunc{}
diff --git a/pixiu/pkg/pluginregistry/registry.go
b/pixiu/pkg/pluginregistry/registry.go
index 9078473b..c3446f43 100644
--- a/pixiu/pkg/pluginregistry/registry.go
+++ b/pixiu/pkg/pluginregistry/registry.go
@@ -20,7 +20,9 @@ package pluginregistry
import (
_ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/dubboregistry"
_ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/springcloud"
+ _
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/cluster/loadbalancer/maglev"
_ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/cluster/loadbalancer/rand"
+ _
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/cluster/loadbalancer/ringhash"
_
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/cluster/loadbalancer/roundrobin"
_ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/filter/accesslog"
_ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/filter/auth/jwt"
diff --git a/pixiu/pkg/server/cluster_manager.go
b/pixiu/pkg/server/cluster_manager.go
index 49f5694b..78793618 100644
--- a/pixiu/pkg/server/cluster_manager.go
+++ b/pixiu/pkg/server/cluster_manager.go
@@ -259,8 +259,8 @@ func (s *ClusterStore) SetEndpoint(clusterName string,
endpoint *model.Endpoint)
// endpoint create
c.Endpoints = append(c.Endpoints, endpoint)
cluster.AddEndpoint(endpoint)
- if c.Hash.ConsistentHash != nil {
-
c.Hash.ConsistentHash.Add(endpoint.Address.Address)
+ if c.ConsistentHash.Hash != nil {
+ c.ConsistentHash.Hash.Add(endpoint.GetHost())
}
return
}
@@ -278,8 +278,8 @@ func (s *ClusterStore) DeleteEndpoint(clusterName string,
endpointID string) {
if e.ID == endpointID {
cluster.RemoveEndpoint(e)
c.Endpoints = append(c.Endpoints[:i],
c.Endpoints[i+1:]...)
- if c.Hash.ConsistentHash != nil {
-
c.Hash.ConsistentHash.Remove(e.Address.Address)
+ if c.ConsistentHash.Hash != nil {
+
c.ConsistentHash.Hash.Remove(e.GetHost())
}
return
}
@@ -288,7 +288,7 @@ func (s *ClusterStore) DeleteEndpoint(clusterName string,
endpointID string) {
return
}
}
- logger.Warnf("not found cluster %s", clusterName)
+ logger.Warnf("not found cluster %s", clusterName)
}
func (s *ClusterStore) HasCluster(clusterName string) bool {