This is an automated email from the ASF dual-hosted git repository.

justxuewei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 19d1da0a0 feat: interleaved weighted round-robin load balance (#2405)
19d1da0a0 is described below

commit 19d1da0a066bd135369eb2ce8611444fb3ad6514
Author: dongjiang <dongjiang1...@126.com>
AuthorDate: Fri Dec 29 10:49:04 2023 +0800

    feat: interleaved weighted round-robin load balance (#2405)
    
    * dongjiang, add interleaved weighted roundrobin loadbalance
    
    Signed-off-by: dongjiang1989 <dongjiang1...@126.com>
    
    * add benchmarks test case
    
    Signed-off-by: dongjiang1989 <dongjiang1...@126.com>
    
    * fix unittest case name
    
    Signed-off-by: dongjiang1989 <dongjiang1...@126.com>
    
    ---------
    
    Signed-off-by: dongjiang1989 <dongjiang1...@126.com>
---
 .../interleavedweightedroundrobin/doc.go           |  12 +-
 .../interleavedweightedroundrobin/iwrr.go          | 130 +++++++++++++++++++++
 .../interleavedweightedroundrobin/loadbalance.go   |  50 ++++++++
 .../loadbalance_test.go                            |  73 ++++++++++++
 cluster/loadbalance/loadbalance_benchmarks_test.go |  80 +++++++++++++
 common/constant/loadbalance.go                     |  13 ++-
 6 files changed, 342 insertions(+), 16 deletions(-)

diff --git a/common/constant/loadbalance.go 
b/cluster/loadbalance/interleavedweightedroundrobin/doc.go
similarity index 71%
copy from common/constant/loadbalance.go
copy to cluster/loadbalance/interleavedweightedroundrobin/doc.go
index dde844337..2f5433c13 100644
--- a/common/constant/loadbalance.go
+++ b/cluster/loadbalance/interleavedweightedroundrobin/doc.go
@@ -15,13 +15,5 @@
  * limitations under the License.
  */
 
-package constant
-
-const (
-       LoadBalanceKeyConsistentHashing = "consistenthashing"
-       LoadBalanceKeyLeastActive       = "leastactive"
-       LoadBalanceKeyRandom            = "random"
-       LoadBalanceKeyRoundRobin        = "roundrobin"
-       LoadBalanceKeyP2C               = "p2c"
-       LoadXDSRingHash                 = "xdsringhash"
-)
+// Package leastactive implements LeastActive load balance strategy.
+package iwrr
diff --git a/cluster/loadbalance/interleavedweightedroundrobin/iwrr.go 
b/cluster/loadbalance/interleavedweightedroundrobin/iwrr.go
new file mode 100644
index 000000000..e1d78a370
--- /dev/null
+++ b/cluster/loadbalance/interleavedweightedroundrobin/iwrr.go
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package iwrr
+
+import (
+       "math/rand"
+       "sync"
+
+       "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
+       "dubbo.apache.org/dubbo-go/v3/protocol"
+)
+
+type iwrrEntry struct {
+       weight  int64
+       invoker protocol.Invoker
+
+       next *iwrrEntry
+}
+
+type iwrrQueue struct {
+       head *iwrrEntry
+       tail *iwrrEntry
+}
+
+func NewIwrrQueue() *iwrrQueue {
+       return &iwrrQueue{}
+}
+
+func (item *iwrrQueue) push(entry *iwrrEntry) {
+       entry.next = nil
+       tail := item.tail
+       item.tail = entry
+       if tail == nil {
+               item.head = entry
+       } else {
+               tail.next = entry
+       }
+}
+
+func (item *iwrrQueue) pop() *iwrrEntry {
+       head := item.head
+       next := head.next
+       head.next = nil
+       item.head = next
+       if next == nil {
+               item.tail = nil
+       }
+       return head
+}
+
+func (item *iwrrQueue) empty() bool {
+       return item.head == nil
+}
+
+// InterleavedweightedRoundRobin struct
+type interleavedweightedRoundRobin struct {
+       current *iwrrQueue
+       next    *iwrrQueue
+       step    int64
+       mu      sync.Mutex
+}
+
+func NewInterleavedweightedRoundRobin(invokers []protocol.Invoker, invocation 
protocol.Invocation) *interleavedweightedRoundRobin {
+       iwrrp := new(interleavedweightedRoundRobin)
+       iwrrp.current = NewIwrrQueue()
+       iwrrp.next = NewIwrrQueue()
+
+       size := uint64(len(invokers))
+       offset := rand.Uint64() % size
+       step := int64(0)
+       for idx := uint64(0); idx < size; idx++ {
+               invoker := invokers[(idx+offset)%size]
+               weight := loadbalance.GetWeight(invoker, invocation)
+               step = gcdInt(step, weight)
+               iwrrp.current.push(&iwrrEntry{
+                       invoker: invoker,
+                       weight:  weight,
+               })
+       }
+       iwrrp.step = step
+
+       return iwrrp
+}
+
+func (iwrr *interleavedweightedRoundRobin) Pick(invocation 
protocol.Invocation) protocol.Invoker {
+       iwrr.mu.Lock()
+       defer iwrr.mu.Unlock()
+
+       if iwrr.current.empty() {
+               iwrr.current, iwrr.next = iwrr.next, iwrr.current
+       }
+
+       entry := iwrr.current.pop()
+       entry.weight -= iwrr.step
+
+       if entry.weight > 0 {
+               iwrr.current.push(entry)
+       } else {
+               weight := loadbalance.GetWeight(entry.invoker, invocation)
+               if weight < 0 {
+                       weight = 0
+               }
+               entry.weight = weight
+               iwrr.next.push(entry)
+       }
+
+       return entry.invoker
+}
+
+func gcdInt(a, b int64) int64 {
+       for b != 0 {
+               a, b = b, a%b
+       }
+       return a
+}
diff --git a/cluster/loadbalance/interleavedweightedroundrobin/loadbalance.go 
b/cluster/loadbalance/interleavedweightedroundrobin/loadbalance.go
new file mode 100644
index 000000000..20f9c3c20
--- /dev/null
+++ b/cluster/loadbalance/interleavedweightedroundrobin/loadbalance.go
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package iwrr
+
+import (
+       "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
+       "dubbo.apache.org/dubbo-go/v3/common/constant"
+       "dubbo.apache.org/dubbo-go/v3/common/extension"
+       "dubbo.apache.org/dubbo-go/v3/protocol"
+)
+
+func init() {
+       
extension.SetLoadbalance(constant.LoadBalanceKeyInterleavedWeightedRoundRobin, 
newInterleavedWeightedRoundRobinBalance)
+}
+
+type interleavedWeightedRoundRobinBalance struct{}
+
+// newInterleavedWeightedRoundRobinBalance returns a interleaved weighted 
round robin load balance.
+func newInterleavedWeightedRoundRobinBalance() loadbalance.LoadBalance {
+       return &interleavedWeightedRoundRobinBalance{}
+}
+
+// Select gets invoker based on interleaved weighted round robine load 
balancing strategy
+func (lb *interleavedWeightedRoundRobinBalance) Select(invokers 
[]protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
+       count := len(invokers)
+       if count == 0 {
+               return nil
+       }
+       if count == 1 {
+               return invokers[0]
+       }
+
+       iwrrp := NewInterleavedweightedRoundRobin(invokers, invocation)
+       return iwrrp.Pick(invocation)
+}
diff --git 
a/cluster/loadbalance/interleavedweightedroundrobin/loadbalance_test.go 
b/cluster/loadbalance/interleavedweightedroundrobin/loadbalance_test.go
new file mode 100644
index 000000000..9e926adf3
--- /dev/null
+++ b/cluster/loadbalance/interleavedweightedroundrobin/loadbalance_test.go
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package iwrr
+
+import (
+       "fmt"
+       "testing"
+
+       "dubbo.apache.org/dubbo-go/v3/common"
+       "dubbo.apache.org/dubbo-go/v3/common/constant"
+       "dubbo.apache.org/dubbo-go/v3/protocol"
+       "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
+       "github.com/stretchr/testify/assert"
+)
+
+func TestIWrrRoundRobinSelect(t *testing.T) {
+       loadBalance := newInterleavedWeightedRoundRobinBalance()
+
+       var invokers []protocol.Invoker
+
+       url, _ := 
common.NewURL(fmt.Sprintf("dubbo://%s:%d/org.apache.demo.HelloService",
+               constant.LocalHostValue, constant.DefaultPort))
+       invokers = append(invokers, protocol.NewBaseInvoker(url))
+       i := loadBalance.Select(invokers, &invocation.RPCInvocation{})
+       assert.True(t, i.GetURL().URLEqual(url))
+
+       for i := 1; i < 10; i++ {
+               url, _ := 
common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService",
 i))
+               invokers = append(invokers, protocol.NewBaseInvoker(url))
+       }
+       loadBalance.Select(invokers, &invocation.RPCInvocation{})
+}
+
+func TestIWrrRoundRobinByWeight(t *testing.T) {
+       loadBalance := newInterleavedWeightedRoundRobinBalance()
+
+       var invokers []protocol.Invoker
+       loop := 10
+       for i := 1; i <= loop; i++ {
+               url, _ := 
common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService?weight=%v",
 i, i))
+               invokers = append(invokers, protocol.NewBaseInvoker(url))
+       }
+
+       loop = (1 + loop) * loop / 2
+       selected := make(map[protocol.Invoker]int)
+
+       for i := 1; i <= loop; i++ {
+               invoker := loadBalance.Select(invokers, 
&invocation.RPCInvocation{})
+               selected[invoker]++
+       }
+
+       sum := 0
+       for _, value := range selected {
+               sum += value
+       }
+
+       assert.Equal(t, loop, sum)
+}
diff --git a/cluster/loadbalance/loadbalance_benchmarks_test.go 
b/cluster/loadbalance/loadbalance_benchmarks_test.go
new file mode 100644
index 000000000..c1d27b7de
--- /dev/null
+++ b/cluster/loadbalance/loadbalance_benchmarks_test.go
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package loadbalance_test
+
+import (
+       "fmt"
+       "testing"
+
+       "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
+       _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/consistenthashing"
+       _ 
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/interleavedweightedroundrobin"
+       _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/leastactive"
+       _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/p2c"
+       _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random"
+       _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/ringhash"
+       _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/roundrobin"
+       "dubbo.apache.org/dubbo-go/v3/common"
+       "dubbo.apache.org/dubbo-go/v3/common/constant"
+       "dubbo.apache.org/dubbo-go/v3/common/extension"
+       "dubbo.apache.org/dubbo-go/v3/protocol"
+       "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
+)
+
+func Generate() []protocol.Invoker {
+       var invokers []protocol.Invoker
+       for i := 1; i < 256; i++ {
+               url, _ := 
common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService",
 i))
+               invokers = append(invokers, protocol.NewBaseInvoker(url))
+       }
+       return invokers
+}
+
+func Benchloadbalace(b *testing.B, lb loadbalance.LoadBalance) {
+       b.Helper()
+       invokers := Generate()
+       b.ReportAllocs()
+       b.ResetTimer()
+       for i := 0; i < b.N; i++ {
+               lb.Select(invokers, &invocation.RPCInvocation{})
+       }
+}
+
+func BenchmarkRoudrobinLoadbalace(b *testing.B) {
+       Benchloadbalace(b, 
extension.GetLoadbalance(constant.LoadBalanceKeyRoundRobin))
+}
+
+func BenchmarkLeastativeLoadbalace(b *testing.B) {
+       Benchloadbalace(b, 
extension.GetLoadbalance(constant.LoadBalanceKeyLeastActive))
+}
+
+func BenchmarkConsistenthashingLoadbalace(b *testing.B) {
+       Benchloadbalace(b, 
extension.GetLoadbalance(constant.LoadBalanceKeyConsistentHashing))
+}
+
+func BenchmarkP2CLoadbalace(b *testing.B) {
+       Benchloadbalace(b, extension.GetLoadbalance(constant.LoadBalanceKeyP2C))
+}
+
+func BenchmarkInterleavedWeightedRoundRobinLoadbalace(b *testing.B) {
+       Benchloadbalace(b, 
extension.GetLoadbalance(constant.LoadBalanceKeyInterleavedWeightedRoundRobin))
+}
+
+func BenchmarkRandomLoadbalace(b *testing.B) {
+       Benchloadbalace(b, 
extension.GetLoadbalance(constant.LoadBalanceKeyRandom))
+}
diff --git a/common/constant/loadbalance.go b/common/constant/loadbalance.go
index dde844337..09af051b4 100644
--- a/common/constant/loadbalance.go
+++ b/common/constant/loadbalance.go
@@ -18,10 +18,11 @@
 package constant
 
 const (
-       LoadBalanceKeyConsistentHashing = "consistenthashing"
-       LoadBalanceKeyLeastActive       = "leastactive"
-       LoadBalanceKeyRandom            = "random"
-       LoadBalanceKeyRoundRobin        = "roundrobin"
-       LoadBalanceKeyP2C               = "p2c"
-       LoadXDSRingHash                 = "xdsringhash"
+       LoadBalanceKeyConsistentHashing             = "consistenthashing"
+       LoadBalanceKeyLeastActive                   = "leastactive"
+       LoadBalanceKeyRandom                        = "random"
+       LoadBalanceKeyRoundRobin                    = "roundrobin"
+       LoadBalanceKeyP2C                           = "p2c"
+       LoadXDSRingHash                             = "xdsringhash"
+       LoadBalanceKeyInterleavedWeightedRoundRobin = 
"interleavedweightedroundrobin"
 )

Reply via email to