This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8a01f686 Add Maglev hashing LB algorithm (#554)
8a01f686 is described below

commit 8a01f686cf3f6e261580793c6807dc014a637638
Author: sh2 <[email protected]>
AuthorDate: Tue May 16 15:10:54 2023 +0800

    Add Maglev hashing LB algorithm (#554)
    
    * Add load balancing algorithm Maglev
    
    * update consistent hash load balancing to a more generic type
    
    * outsource consistent hash init function
    
    * add support for maglev hash
    
    * add test for maglev hash
    
    * finish maglev hash lb algorithm
    
    * fix import format
    
    * fix review
    
    * improve consistency for maglev hash
    
    * add maglev and ring hash to plugin registry, fix a bug in rand lb
    
    * add default table size option for maglev hash and using random hash func
    
    * withdraw rand range and fix test
---
 pixiu/pkg/cluster/loadbalancer/load_balancer.go    |   7 +
 .../consistent_hash.go => maglev/maglev_hash.go}   |  40 ++-
 .../maglev_hash_test.go}                           |  14 +-
 .../pkg/cluster/loadbalancer/maglev/permutation.go | 288 +++++++++++++++++++++
 .../loadbalancer/maglev/permutation_test.go        | 278 ++++++++++++++++++++
 .../consistent_hash.go => ringhash/ring_hash.go}   |  43 ++-
 .../ring_hash_test.go}                             |  12 +-
 pixiu/pkg/model/cluster.go                         |  44 +---
 pixiu/pkg/model/lb.go                              |  28 +-
 pixiu/pkg/pluginregistry/registry.go               |   2 +
 pixiu/pkg/server/cluster_manager.go                |  10 +-
 11 files changed, 695 insertions(+), 71 deletions(-)

diff --git a/pixiu/pkg/cluster/loadbalancer/load_balancer.go 
b/pixiu/pkg/cluster/loadbalancer/load_balancer.go
index 969a1175..3c40f759 100644
--- a/pixiu/pkg/cluster/loadbalancer/load_balancer.go
+++ b/pixiu/pkg/cluster/loadbalancer/load_balancer.go
@@ -34,3 +34,10 @@ func RegisterLoadBalancer(name model.LbPolicyType, balancer 
LoadBalancer) {
        }
        LoadBalancerStrategy[name] = balancer
 }
+
+func RegisterConsistentHashInit(name model.LbPolicyType, function 
model.ConsistentHashInitFunc) {
+       if _, ok := model.ConsistentHashInitMap[name]; ok {
+               panic("consistent hash load balancer register fail " + name)
+       }
+       model.ConsistentHashInitMap[name] = function
+}
diff --git a/pixiu/pkg/cluster/loadbalancer/hash/consistent_hash.go 
b/pixiu/pkg/cluster/loadbalancer/maglev/maglev_hash.go
similarity index 50%
copy from pixiu/pkg/cluster/loadbalancer/hash/consistent_hash.go
copy to pixiu/pkg/cluster/loadbalancer/maglev/maglev_hash.go
index f56d6275..72d7b10f 100644
--- a/pixiu/pkg/cluster/loadbalancer/hash/consistent_hash.go
+++ b/pixiu/pkg/cluster/loadbalancer/maglev/maglev_hash.go
@@ -15,36 +15,52 @@
  * limitations under the License.
  */
 
-package consistent
-
-import (
-       "fmt"
-)
+package maglev
 
 import (
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/cluster/loadbalancer"
+       
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/cluster/loadbalancer/ringhash"
+       "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
 )
 
 func init() {
-       loadbalancer.RegisterLoadBalancer(model.LoadBalanceConsistentHashing, 
ConsistentHashing{})
+       loadbalancer.RegisterLoadBalancer(model.LoadBalancerMaglevHashing, 
MaglevHash{})
+       
loadbalancer.RegisterConsistentHashInit(model.LoadBalancerMaglevHashing, 
NewMaglevHash)
 }
 
-type ConsistentHashing struct{}
+func NewMaglevHash(config model.ConsistentHash, endpoints []*model.Endpoint) 
model.LbConsistentHash {
+       hosts := make([]string, len(endpoints))
+       for i, endpoint := range endpoints {
+               hosts[i] = endpoint.GetHost()
+       }
+
+       h, err := NewLookUpTable(config.MaglevTableSize, hosts)
+       if err == nil {
+               h.Populate()
+               return h
+       }
+
+       logger.Infof("[dubbo-go-pixiu] maglev hash load balancing fail: %v, 
using ring hash instead", err)
+       if config.ReplicaNum == 0 {
+               config.ReplicaNum = 2 * len(endpoints)
+       }
+       return ringhash.NewRingHash(config, endpoints)
+}
 
-func (ConsistentHashing) Handler(c *model.ClusterConfig, policy 
model.LbPolicy) *model.Endpoint {
-       u := c.Hash.ConsistentHash.Hash(policy.GenerateHash())
+type MaglevHash struct{}
 
-       hash, err := c.Hash.ConsistentHash.GetHash(u)
+func (m MaglevHash) Handler(c *model.ClusterConfig, policy model.LbPolicy) 
*model.Endpoint {
+       dst, err := c.ConsistentHash.Hash.Get(policy.GenerateHash())
        if err != nil {
+               logger.Warnf("[dubbo-go-pixiu] error of getting from maglev 
hash: %v", err)
                return nil
        }
 
        endpoints := c.GetEndpoint(true)
 
        for _, endpoint := range endpoints {
-               address := endpoint.Address
-               if fmt.Sprintf("%s:%d", address.Address, address.Port) == hash {
+               if endpoint.GetHost() == dst {
                        return endpoint
                }
        }
diff --git a/pixiu/pkg/cluster/loadbalancer/hash/consistent_hash_test.go 
b/pixiu/pkg/cluster/loadbalancer/maglev/maglev_hash_test.go
similarity index 84%
copy from pixiu/pkg/cluster/loadbalancer/hash/consistent_hash_test.go
copy to pixiu/pkg/cluster/loadbalancer/maglev/maglev_hash_test.go
index 158a895f..a28124ff 100644
--- a/pixiu/pkg/cluster/loadbalancer/hash/consistent_hash_test.go
+++ b/pixiu/pkg/cluster/loadbalancer/maglev/maglev_hash_test.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package consistent
+package maglev
 
 import (
        "fmt"
@@ -29,7 +29,7 @@ import (
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
 )
 
-func TestHashRing(t *testing.T) {
+func TestMaglevHash(t *testing.T) {
 
        nodeCount := 5
 
@@ -41,12 +41,16 @@ func TestHashRing(t *testing.T) {
                        Address: model.SocketAddress{Address: "192.168.1." + 
name, Port: 1000 + i}})
        }
 
-       cluster := &model.ClusterConfig{Name: "cluster1", Endpoints: nodes,
-               LbStr: model.LoadBalanceConsistentHashing, Hash: 
model.Hash{ReplicaNum: 10, MaxVnodeNum: 1023}}
+       cluster := &model.ClusterConfig{
+               Name:           "test-cluster",
+               Endpoints:      nodes,
+               LbStr:          model.LoadBalancerMaglevHashing,
+               ConsistentHash: model.ConsistentHash{MaglevTableSize: 521},
+       }
        cluster.CreateConsistentHash()
 
        var (
-               hashing = ConsistentHashing{}
+               hashing = MaglevHash{}
                path    string
        )
 
diff --git a/pixiu/pkg/cluster/loadbalancer/maglev/permutation.go 
b/pixiu/pkg/cluster/loadbalancer/maglev/permutation.go
new file mode 100644
index 00000000..e65de6e4
--- /dev/null
+++ b/pixiu/pkg/cluster/loadbalancer/maglev/permutation.go
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package maglev
+
+import (
+       "encoding/binary"
+       "hash/maphash"
+       "math"
+       "math/big"
+       "sync"
+)
+
+import (
+       "github.com/pkg/errors"
+       "golang.org/x/crypto/blake2b"
+       "golang.org/x/crypto/sha3"
+)
+
+// Default table size support maximum 10,000 endpoints
+var defaultTableSize = []int{
+       307, 503, 1511, 2503, 5099, 7877,
+       10007, 14207, 20903, 40423, 88721,
+       123911, 164821, 199999, 218077, 299777,
+       344941, 399989, 463031, 514399, 604613,
+       686669, 726337, 799789, 857011, 999983,
+}
+
+type permutation struct {
+       pos   []uint32
+       next  int
+       index int
+       hit   int
+}
+
+type LookUpTable struct {
+       slots        []string
+       permutations []*permutation
+       buckets      map[int]string
+       endpointNum  int
+       size         int
+       sync.RWMutex
+}
+
+func NewLookUpTable(tableSize int, hosts []string) (*LookUpTable, error) {
+       expectN := len(hosts) * 100
+       if tableSize == 0 {
+               // find closet table size
+               for _, size := range defaultTableSize {
+                       if expectN < size {
+                               tableSize = size
+                               break
+                       }
+               }
+       }
+
+       if tableSize < expectN && 
!big.NewInt(0).SetUint64(uint64(tableSize)).ProbablyPrime(1) {
+               return nil, errors.Errorf("table size should be a prime number 
that greater than at least "+
+                       "100 times of endpoints size, but got %d instead", 
tableSize)
+       }
+
+       buckets := make(map[int]string, len(hosts))
+       for i, host := range hosts {
+               buckets[i] = host
+       }
+       n := len(buckets)
+
+       return &LookUpTable{
+               buckets:     buckets,
+               endpointNum: n,
+               size:        tableSize,
+       }, nil
+}
+
+// Populate Magelev hashing look up table.
+func (t *LookUpTable) Populate() {
+       t.Lock()
+       defer t.Unlock()
+
+       t.generatePerms()
+       t.populate()
+}
+
+func (t *LookUpTable) populate() {
+       t.slots = make([]string, t.size)
+
+       full, miss := 0, 0
+       for miss < t.endpointNum && full < t.size {
+               for _, p := range t.permutations {
+                       if p.next == t.size {
+                               continue
+                       }
+                       start := p.next
+                       for start < t.size && len(t.slots[p.pos[start]]) > 0 {
+                               start++
+                       }
+                       if start < t.size {
+                               t.slots[p.pos[start]] = t.buckets[p.index]
+                               p.hit++
+                               full++
+                       } else {
+                               miss++
+                       }
+                       p.next = start
+               }
+       }
+
+       // Fill the empty slots with the least placed Endpoint.
+       if full != t.size && miss > 0 {
+               t.fillMissingSlots()
+       }
+}
+
+func (t *LookUpTable) fillMissingSlots() {
+       var minP *permutation
+       minHit := math.MaxInt
+       for _, p := range t.permutations {
+               if p.hit < minHit {
+                       minHit = p.hit
+                       minP = p
+               }
+       }
+       for i, s := range t.slots {
+               if len(s) == 0 {
+                       t.slots[i] = t.buckets[minP.index]
+                       minP.hit++
+               }
+       }
+}
+
+func (t *LookUpTable) generatePerms() {
+       t.permutations = make([]*permutation, 0, t.endpointNum)
+
+       for i, b := range t.buckets {
+               t.generatePerm(b, i)
+       }
+}
+
+func (t *LookUpTable) generatePerm(bucket string, i int) {
+       var offs, skip, j uint32
+
+       m := uint32(t.size)
+       pos := make([]uint32, m)
+       offs = _hash1(bucket) % m
+       skip = _hash2(bucket)%(m-1) + 1
+       for j = 0; j < m; j++ {
+               pos[j] = (offs + j*skip) % m
+       }
+       t.permutations = append(t.permutations, &permutation{pos, 0, i, 0})
+}
+
+func (t *LookUpTable) resetPerms() {
+       for _, p := range t.permutations {
+               p.next = 0
+               p.hit = 0
+       }
+}
+
+func (t *LookUpTable) removePerm(dst int) {
+       del := -1
+       for i, p := range t.permutations {
+               if p.index == dst {
+                       del = i
+                       break
+               }
+       }
+       if del != -1 {
+               t.permutations[del] = nil
+               t.permutations = append(t.permutations[:del], 
t.permutations[del+1:]...)
+       }
+}
+
+// Hash the input key.
+func (t *LookUpTable) Hash(key string) uint32 {
+       var h maphash.Hash
+       h.SetSeed(maphash.MakeSeed())
+       h.WriteString(key)
+       return uint32(h.Sum64())
+}
+
+// Get a slot by hashing the input key.
+func (t *LookUpTable) Get(key string) (string, error) {
+       t.RLock()
+       defer t.RUnlock()
+
+       if len(t.slots) == 0 {
+               return "", errors.New("no slot initialized, please call 
Populate() before Get()")
+       }
+
+       if t.size == 0 || t.endpointNum == 0 {
+               return "", errors.New("no host added")
+       }
+
+       dst := t.Hash(key) % uint32(t.size)
+       return t.slots[dst], nil
+}
+
+// GetHash a slot by a hashed key.
+func (t *LookUpTable) GetHash(key uint32) (string, error) {
+       t.RLock()
+       defer t.RUnlock()
+
+       if len(t.slots) == 0 {
+               return "", errors.New("no slot initialized, please call 
Populate() before Get()")
+       }
+
+       if t.size == 0 || t.endpointNum == 0 {
+               return "", errors.New("no host added")
+       }
+
+       return t.slots[key], nil
+}
+
+// Add one endpoint into lookup table.
+func (t *LookUpTable) Add(host string) {
+       t.Lock()
+       defer t.Unlock()
+
+       t.add(host)
+}
+
+func (t *LookUpTable) add(host string) {
+       dst := 0
+       for i, bucket := range t.buckets {
+               if bucket == host {
+                       return
+               }
+               if i > dst {
+                       dst = i
+               }
+       }
+       t.buckets[dst+1] = host
+       t.endpointNum++
+       t.resetPerms()
+       t.generatePerm(host, dst+1)
+       t.populate()
+}
+
+// Remove one endpoint from lookup table.
+func (t *LookUpTable) Remove(host string) bool {
+       t.Lock()
+       defer t.Unlock()
+
+       return t.remove(host)
+}
+
+func (t *LookUpTable) remove(host string) bool {
+       if t.endpointNum <= 0 {
+               return false
+       }
+
+       for i, bucket := range t.buckets {
+               if bucket == host {
+                       delete(t.buckets, i)
+                       t.endpointNum--
+                       t.removePerm(i)
+                       t.resetPerms()
+                       t.populate()
+                       return true
+               }
+       }
+
+       return false
+}
+
+func _hash1(key string) uint32 {
+       out := sha3.Sum512([]byte(key))
+       return binary.LittleEndian.Uint32(out[:])
+}
+
+func _hash2(key string) uint32 {
+       out := blake2b.Sum512([]byte(key))
+       return binary.LittleEndian.Uint32(out[:])
+}
diff --git a/pixiu/pkg/cluster/loadbalancer/maglev/permutation_test.go 
b/pixiu/pkg/cluster/loadbalancer/maglev/permutation_test.go
new file mode 100644
index 00000000..4bda48f8
--- /dev/null
+++ b/pixiu/pkg/cluster/loadbalancer/maglev/permutation_test.go
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package maglev
+
+import (
+       "fmt"
+       "testing"
+)
+
+import (
+       "github.com/stretchr/testify/assert"
+)
+
+const ConsistencyToleration = 3
+
+func WithLookUpTable(tableSize, nodeCount int, function func(*LookUpTable)) {
+       table := createTableWithNodes(tableSize, nodeCount)
+       table.Populate()
+       if function != nil {
+               function(table)
+       }
+}
+
+func createTableWithNodes(tableSize, nodeCount int) *LookUpTable {
+       nodes := make([]string, 0, nodeCount)
+       for i := 1; i <= nodeCount; i++ {
+               nodes = append(nodes, createEndpoint(i))
+       }
+       table, _ := NewLookUpTable(tableSize, nodes)
+       return table
+}
+
+func createEndpoint(id int) string {
+       return fmt.Sprintf("192.168.1.%d:%d", id, 1000+id)
+}
+
+func TestCreateLookUpTableWithoutTableSize(t *testing.T) {
+       testCases := []struct {
+               nodeCount  int
+               expectSize int
+       }{
+               {
+                       nodeCount:  0,
+                       expectSize: 307,
+               },
+               {
+                       nodeCount:  1,
+                       expectSize: 307,
+               },
+               {
+                       nodeCount:  3,
+                       expectSize: 307,
+               },
+               {
+                       nodeCount:  5,
+                       expectSize: 503,
+               },
+               {
+                       nodeCount:  15,
+                       expectSize: 1511,
+               },
+               {
+                       nodeCount:  100001,
+                       expectSize: 0,
+               },
+       }
+       for i, tc := range testCases {
+               table := createTableWithNodes(0, tc.nodeCount)
+               if i == 5 {
+                       assert.Nil(t, table, "Got non nil table")
+               } else {
+                       assert.Equal(t, tc.expectSize, table.size, "Wrong 
default table size")
+               }
+       }
+}
+
+func TestLookUpTable_Populate(t *testing.T) {
+       testCases := []struct {
+               nodeCount int
+               size      int
+       }{
+               {
+                       nodeCount: 0,
+                       size:      11,
+               },
+               {
+                       nodeCount: 3,
+                       size:      313,
+               },
+               {
+                       nodeCount: 5,
+                       size:      503,
+               },
+               {
+                       nodeCount: 10,
+                       size:      1013,
+               },
+               {
+                       nodeCount: 50,
+                       size:      5021,
+               },
+               {
+                       nodeCount: 250,
+                       size:      25447,
+               },
+       }
+
+       for _, tc := range testCases {
+               WithLookUpTable(tc.size, tc.nodeCount, func(table *LookUpTable) 
{
+                       checkUnPopulateSlot(t, table)
+                       checkMissingEndpoint(t, table)
+                       checkConsistency(t, table)
+               })
+       }
+}
+
+func TestLookUpTable_Get(t *testing.T) {
+       tableSize, nodeCount := 1033, 10
+       key := "/this/is/a/test"
+
+       table := createTableWithNodes(tableSize, nodeCount)
+
+       _, err := table.Get(key)
+       assert.NotNil(t, err, "Got endpoint before populating")
+
+       table.Populate()
+
+       ep, err := table.Get(key)
+       assert.Nil(t, err, "Fail to get endpoint")
+       assert.NotEqual(t, "", ep, "Wrong endpoint")
+}
+
+func TestLookUpTable_Add(t *testing.T) {
+       testCases := []struct {
+               nodeCount int
+               size      int
+               add       []int
+       }{
+               {
+                       nodeCount: 0,
+                       size:      11,
+                       add:       []int{1, 2, 3, 4, 5},
+               },
+               {
+                       nodeCount: 3,
+                       size:      313,
+                       add:       []int{4, 5, 6},
+               },
+               {
+                       nodeCount: 50,
+                       size:      5021,
+                       add:       []int{51, 52, 53, 54},
+               },
+       }
+
+       for _, tc := range testCases {
+               WithLookUpTable(tc.size, tc.nodeCount, func(table *LookUpTable) 
{
+                       for _, add := range tc.add {
+                               ep := createEndpoint(add)
+                               table.Add(ep)
+                               checkUnPopulateSlot(t, table)
+                               checkMissingEndpoint(t, table)
+                               checkConsistency(t, table)
+                       }
+               })
+       }
+}
+
+func TestLookUpTable_Remove(t *testing.T) {
+       testCases := []struct {
+               nodeCount int
+               size      int
+               delete    []int
+               wanted    []bool
+       }{
+               {
+                       nodeCount: 0,
+                       size:      11,
+                       delete:    []int{2},
+                       wanted:    []bool{false},
+               },
+               {
+                       nodeCount: 1,
+                       size:      101,
+                       delete:    []int{2, 1, 2},
+                       wanted:    []bool{false, true, false},
+               },
+               {
+                       nodeCount: 3,
+                       size:      307,
+                       delete:    []int{1, 2, 3, 4},
+                       wanted:    []bool{true, true, true, false},
+               },
+               {
+                       nodeCount: 10,
+                       size:      1013,
+                       delete:    []int{4, 5, 6, 11},
+                       wanted:    []bool{true, true, true, false},
+               },
+               {
+                       nodeCount: 50,
+                       size:      5021,
+                       delete:    []int{54, 15, 1, 59, 5, 1},
+                       wanted:    []bool{false, true, true, false, true, 
false},
+               },
+       }
+
+       for id, tc := range testCases {
+               WithLookUpTable(tc.size, tc.nodeCount, func(table *LookUpTable) 
{
+                       for i, del := range tc.delete {
+                               ep := createEndpoint(del)
+                               ret := table.Remove(ep)
+                               assert.Equal(t, tc.wanted[i], ret,
+                                       "Wrong state removing: %s in case %d", 
del, id)
+                               checkUnPopulateSlot(t, table)
+                               checkMissingEndpoint(t, table)
+                               checkEndpointNotIn(t, table, ep)
+                               checkConsistency(t, table)
+                       }
+               })
+       }
+}
+
+func checkUnPopulateSlot(t *testing.T, table *LookUpTable) {
+       if table.endpointNum == 0 {
+               return
+       }
+
+       for i, s := range table.slots {
+               assert.NotEqual(t, "", s,
+                       "Unpopulating slot in table(node=%d, factor=%d) at 
slot: %d", table.endpointNum, table.size, i)
+       }
+}
+
+func checkMissingEndpoint(t *testing.T, table *LookUpTable) {
+       for _, p := range table.permutations {
+               assert.Equal(t, true, p.hit > 0,
+                       "Missing endpoint: %s in table(node=%d, factor=%d)", 
table.slots[p.index], table.endpointNum, table.size)
+       }
+}
+
+func checkEndpointNotIn(t *testing.T, table *LookUpTable, endpoint string) {
+       for i, slot := range table.slots {
+               assert.NotEqual(t, endpoint, slot,
+                       "Unexpect endpoint %s in slot %d", endpoint, i)
+       }
+}
+
+func checkConsistency(t *testing.T, table *LookUpTable) {
+       if table.endpointNum == 0 {
+               return
+       }
+
+       avg := table.size / table.endpointNum
+       dist := make(map[string]int)
+       for _, slot := range table.slots {
+               dist[slot]++
+       }
+       for k, v := range dist {
+               assert.NotEqual(t, true, v > avg+ConsistencyToleration || v < 
avg-ConsistencyToleration,
+                       "%s with distributions %d not in %d +/- %d", k, v, avg, 
ConsistencyToleration)
+       }
+}
diff --git a/pixiu/pkg/cluster/loadbalancer/hash/consistent_hash.go 
b/pixiu/pkg/cluster/loadbalancer/ringhash/ring_hash.go
similarity index 50%
rename from pixiu/pkg/cluster/loadbalancer/hash/consistent_hash.go
rename to pixiu/pkg/cluster/loadbalancer/ringhash/ring_hash.go
index f56d6275..13b59ab7 100644
--- a/pixiu/pkg/cluster/loadbalancer/hash/consistent_hash.go
+++ b/pixiu/pkg/cluster/loadbalancer/ringhash/ring_hash.go
@@ -15,36 +15,61 @@
  * limitations under the License.
  */
 
-package consistent
+package ringhash
 
 import (
-       "fmt"
+       "math"
+)
+
+import (
+       "github.com/dubbogo/gost/hash/consistent"
 )
 
 import (
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/cluster/loadbalancer"
+       "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
 )
 
 func init() {
-       loadbalancer.RegisterLoadBalancer(model.LoadBalanceConsistentHashing, 
ConsistentHashing{})
+       loadbalancer.RegisterLoadBalancer(model.LoadBalancerRingHashing, 
RingHashing{})
+       loadbalancer.RegisterConsistentHashInit(model.LoadBalancerRingHashing, 
NewRingHash)
 }
 
-type ConsistentHashing struct{}
+func NewRingHash(config model.ConsistentHash, endpoints []*model.Endpoint) 
model.LbConsistentHash {
+       var ops []consistent.Option
+
+       if config.ReplicaNum != 0 {
+               ops = append(ops, consistent.WithReplicaNum(config.ReplicaNum))
+       }
+
+       if config.MaxVnodeNum != 0 {
+               ops = append(ops, 
consistent.WithMaxVnodeNum(int(config.MaxVnodeNum)))
+       } else {
+               config.MaxVnodeNum = math.MinInt32
+       }
+
+       h := consistent.NewConsistentHash(ops...)
+       for _, endpoint := range endpoints {
+               h.Add(endpoint.GetHost())
+       }
+       return h
+}
 
-func (ConsistentHashing) Handler(c *model.ClusterConfig, policy 
model.LbPolicy) *model.Endpoint {
-       u := c.Hash.ConsistentHash.Hash(policy.GenerateHash())
+type RingHashing struct{}
 
-       hash, err := c.Hash.ConsistentHash.GetHash(u)
+func (r RingHashing) Handler(c *model.ClusterConfig, policy model.LbPolicy) 
*model.Endpoint {
+       u := c.ConsistentHash.Hash.Hash(policy.GenerateHash())
+       hash, err := c.ConsistentHash.Hash.GetHash(u)
        if err != nil {
+               logger.Warnf("[dubbo-go-pixiu] error of getting from ring hash: 
%v", err)
                return nil
        }
 
        endpoints := c.GetEndpoint(true)
 
        for _, endpoint := range endpoints {
-               address := endpoint.Address
-               if fmt.Sprintf("%s:%d", address.Address, address.Port) == hash {
+               if endpoint.GetHost() == hash {
                        return endpoint
                }
        }
diff --git a/pixiu/pkg/cluster/loadbalancer/hash/consistent_hash_test.go 
b/pixiu/pkg/cluster/loadbalancer/ringhash/ring_hash_test.go
similarity index 85%
rename from pixiu/pkg/cluster/loadbalancer/hash/consistent_hash_test.go
rename to pixiu/pkg/cluster/loadbalancer/ringhash/ring_hash_test.go
index 158a895f..ea8ceded 100644
--- a/pixiu/pkg/cluster/loadbalancer/hash/consistent_hash_test.go
+++ b/pixiu/pkg/cluster/loadbalancer/ringhash/ring_hash_test.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package consistent
+package ringhash
 
 import (
        "fmt"
@@ -41,12 +41,16 @@ func TestHashRing(t *testing.T) {
                        Address: model.SocketAddress{Address: "192.168.1." + 
name, Port: 1000 + i}})
        }
 
-       cluster := &model.ClusterConfig{Name: "cluster1", Endpoints: nodes,
-               LbStr: model.LoadBalanceConsistentHashing, Hash: 
model.Hash{ReplicaNum: 10, MaxVnodeNum: 1023}}
+       cluster := &model.ClusterConfig{
+               Name:           "cluster1",
+               Endpoints:      nodes,
+               LbStr:          model.LoadBalancerRingHashing,
+               ConsistentHash: model.ConsistentHash{ReplicaNum: 10, 
MaxVnodeNum: 1023},
+       }
        cluster.CreateConsistentHash()
 
        var (
-               hashing = ConsistentHashing{}
+               hashing = RingHashing{}
                path    string
        )
 
diff --git a/pixiu/pkg/model/cluster.go b/pixiu/pkg/model/cluster.go
index 6239c1c6..2040305e 100644
--- a/pixiu/pkg/model/cluster.go
+++ b/pixiu/pkg/model/cluster.go
@@ -19,11 +19,6 @@ package model
 
 import (
        "fmt"
-       "math"
-)
-
-import (
-       "github.com/dubbogo/gost/hash/consistent"
 )
 
 const (
@@ -62,7 +57,7 @@ type (
                Type                 DiscoveryType       `yaml:"-" json:"-"`    
   // Type the cluster discovery type
                EdsClusterConfig     EdsClusterConfig    
`yaml:"eds_cluster_config" json:"eds_cluster_config" 
mapstructure:"eds_cluster_config"`
                LbStr                LbPolicyType        `yaml:"lb_policy" 
json:"lb_policy"`   // Lb the cluster select node used loadBalance policy
-               Hash                 Hash                `yaml:"consistent" 
json:"consistent"` // Consistent hash config info
+               ConsistentHash       ConsistentHash      `yaml:"consistent" 
json:"consistent"` // Consistent hash config info
                HealthChecks         []HealthCheckConfig `yaml:"health_checks" 
json:"health_checks"`
                Endpoints            []*Endpoint         `yaml:"endpoints" 
json:"endpoints"`
                PrePickEndpointIndex int
@@ -100,10 +95,12 @@ type (
                UnHealthy bool
        }
 
-       Hash struct {
-               ReplicaNum     int   `yaml:"replica_num" json:"replica_num"`
-               MaxVnodeNum    int32 `yaml:"max_vnode_num" json:"max_vnode_num"`
-               ConsistentHash *consistent.Consistent
+       // ConsistentHash methods include: RingHash, MaglevHash
+       ConsistentHash struct {
+               ReplicaNum      int   `yaml:"replica_num" json:"replica_num"`
+               MaxVnodeNum     int32 `yaml:"max_vnode_num" 
json:"max_vnode_num"`
+               MaglevTableSize int   `yaml:"maglev_table_size" 
json:"maglev_table_size"`
+               Hash            LbConsistentHash
        }
 )
 
@@ -118,26 +115,13 @@ func (c *ClusterConfig) GetEndpoint(mustHealth bool) 
[]*Endpoint {
        return endpoints
 }
 
+// CreateConsistentHash creates consistent hashing algorithms for Load Balance.
 func (c *ClusterConfig) CreateConsistentHash() {
-       if c.LbStr == LoadBalanceConsistentHashing {
-               var ops []consistent.Option
-
-               if c.Hash.ReplicaNum != 0 {
-                       ops = append(ops, 
consistent.WithReplicaNum(c.Hash.ReplicaNum))
-               }
-
-               if c.Hash.MaxVnodeNum != 0 {
-                       ops = append(ops, 
consistent.WithMaxVnodeNum(int(c.Hash.MaxVnodeNum)))
-               } else {
-                       c.Hash.MaxVnodeNum = math.MinInt32
-               }
-
-               h := consistent.NewConsistentHash(ops...)
-
-               for _, endpoint := range c.Endpoints {
-                       address := endpoint.Address
-                       h.Add(fmt.Sprintf("%s:%d", address.Address, 
address.Port))
-               }
-               c.Hash.ConsistentHash = h
+       if newConsistentHash, ok := ConsistentHashInitMap[c.LbStr]; ok {
+               c.ConsistentHash.Hash = newConsistentHash(c.ConsistentHash, 
c.Endpoints)
        }
 }
+
+func (e Endpoint) GetHost() string {
+       return fmt.Sprintf("%s:%d", e.Address.Address, e.Address.Port)
+}
diff --git a/pixiu/pkg/model/lb.go b/pixiu/pkg/model/lb.go
index f41b03f2..906fb086 100644
--- a/pixiu/pkg/model/lb.go
+++ b/pixiu/pkg/model/lb.go
@@ -21,17 +21,33 @@ package model
 type LbPolicyType string
 
 const (
-       LoadBalancerRand             LbPolicyType = "Rand"
-       LoadBalancerRoundRobin       LbPolicyType = "RoundRobin"
-       LoadBalanceConsistentHashing LbPolicyType = "ConsistentHashing"
+       LoadBalancerRand          LbPolicyType = "Rand"
+       LoadBalancerRoundRobin    LbPolicyType = "RoundRobin"
+       LoadBalancerRingHashing   LbPolicyType = "RingHashing"
+       LoadBalancerMaglevHashing LbPolicyType = "MaglevHashing"
 )
 
 var LbPolicyTypeValue = map[string]LbPolicyType{
-       "Rand":              LoadBalancerRand,
-       "RoundRobin":        LoadBalancerRoundRobin,
-       "ConsistentHashing": LoadBalanceConsistentHashing,
+       "Rand":          LoadBalancerRand,
+       "RoundRobin":    LoadBalancerRoundRobin,
+       "RingHashing":   LoadBalancerRingHashing,
+       "MaglevHashing": LoadBalancerMaglevHashing,
 }
 
 type LbPolicy interface {
        GenerateHash() string
 }
+
+// LbConsistentHash supports consistent hash load balancing
+type LbConsistentHash interface {
+       Hash(key string) uint32
+       Add(host string)
+       Get(key string) (string, error)
+       GetHash(key uint32) (string, error)
+       Remove(host string) bool
+}
+
+type ConsistentHashInitFunc = func(ConsistentHash, []*Endpoint) 
LbConsistentHash
+
+// ConsistentHashInitMap stores the Init functions for consistent hash load 
balancing
+var ConsistentHashInitMap = map[LbPolicyType]ConsistentHashInitFunc{}
diff --git a/pixiu/pkg/pluginregistry/registry.go 
b/pixiu/pkg/pluginregistry/registry.go
index 9078473b..c3446f43 100644
--- a/pixiu/pkg/pluginregistry/registry.go
+++ b/pixiu/pkg/pluginregistry/registry.go
@@ -20,7 +20,9 @@ package pluginregistry
 import (
        _ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/dubboregistry"
        _ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/springcloud"
+       _ 
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/cluster/loadbalancer/maglev"
        _ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/cluster/loadbalancer/rand"
+       _ 
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/cluster/loadbalancer/ringhash"
        _ 
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/cluster/loadbalancer/roundrobin"
        _ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/filter/accesslog"
        _ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/filter/auth/jwt"
diff --git a/pixiu/pkg/server/cluster_manager.go 
b/pixiu/pkg/server/cluster_manager.go
index 49f5694b..78793618 100644
--- a/pixiu/pkg/server/cluster_manager.go
+++ b/pixiu/pkg/server/cluster_manager.go
@@ -259,8 +259,8 @@ func (s *ClusterStore) SetEndpoint(clusterName string, 
endpoint *model.Endpoint)
                        // endpoint create
                        c.Endpoints = append(c.Endpoints, endpoint)
                        cluster.AddEndpoint(endpoint)
-                       if c.Hash.ConsistentHash != nil {
-                               
c.Hash.ConsistentHash.Add(endpoint.Address.Address)
+                       if c.ConsistentHash.Hash != nil {
+                               c.ConsistentHash.Hash.Add(endpoint.GetHost())
                        }
                        return
                }
@@ -278,8 +278,8 @@ func (s *ClusterStore) DeleteEndpoint(clusterName string, 
endpointID string) {
                                if e.ID == endpointID {
                                        cluster.RemoveEndpoint(e)
                                        c.Endpoints = append(c.Endpoints[:i], 
c.Endpoints[i+1:]...)
-                                       if c.Hash.ConsistentHash != nil {
-                                               
c.Hash.ConsistentHash.Remove(e.Address.Address)
+                                       if c.ConsistentHash.Hash != nil {
+                                               
c.ConsistentHash.Hash.Remove(e.GetHost())
                                        }
                                        return
                                }
@@ -288,7 +288,7 @@ func (s *ClusterStore) DeleteEndpoint(clusterName string, 
endpointID string) {
                        return
                }
        }
-       logger.Warnf("not found  cluster %s", clusterName)
+       logger.Warnf("not found cluster %s", clusterName)
 }
 
 func (s *ClusterStore) HasCluster(clusterName string) bool {

Reply via email to