This is an automated email from the ASF dual-hosted git repository.
justxuewei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/main by this push:
new 19d1da0a0 feat: interleaved weighted round-robin load balance (#2405)
19d1da0a0 is described below
commit 19d1da0a066bd135369eb2ce8611444fb3ad6514
Author: dongjiang <[email protected]>
AuthorDate: Fri Dec 29 10:49:04 2023 +0800
feat: interleaved weighted round-robin load balance (#2405)
* dongjiang, add interleaved weighted roundrobin loadbalance
Signed-off-by: dongjiang1989 <[email protected]>
* add benchmarks test case
Signed-off-by: dongjiang1989 <[email protected]>
* fix unittest case name
Signed-off-by: dongjiang1989 <[email protected]>
---------
Signed-off-by: dongjiang1989 <[email protected]>
---
.../interleavedweightedroundrobin/doc.go | 12 +-
.../interleavedweightedroundrobin/iwrr.go | 130 +++++++++++++++++++++
.../interleavedweightedroundrobin/loadbalance.go | 50 ++++++++
.../loadbalance_test.go | 73 ++++++++++++
cluster/loadbalance/loadbalance_benchmarks_test.go | 80 +++++++++++++
common/constant/loadbalance.go | 13 ++-
6 files changed, 342 insertions(+), 16 deletions(-)
diff --git a/common/constant/loadbalance.go
b/cluster/loadbalance/interleavedweightedroundrobin/doc.go
similarity index 71%
copy from common/constant/loadbalance.go
copy to cluster/loadbalance/interleavedweightedroundrobin/doc.go
index dde844337..2f5433c13 100644
--- a/common/constant/loadbalance.go
+++ b/cluster/loadbalance/interleavedweightedroundrobin/doc.go
@@ -15,13 +15,5 @@
* limitations under the License.
*/
-package constant
-
-const (
- LoadBalanceKeyConsistentHashing = "consistenthashing"
- LoadBalanceKeyLeastActive = "leastactive"
- LoadBalanceKeyRandom = "random"
- LoadBalanceKeyRoundRobin = "roundrobin"
- LoadBalanceKeyP2C = "p2c"
- LoadXDSRingHash = "xdsringhash"
-)
+// Package leastactive implements LeastActive load balance strategy.
+package iwrr
diff --git a/cluster/loadbalance/interleavedweightedroundrobin/iwrr.go
b/cluster/loadbalance/interleavedweightedroundrobin/iwrr.go
new file mode 100644
index 000000000..e1d78a370
--- /dev/null
+++ b/cluster/loadbalance/interleavedweightedroundrobin/iwrr.go
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package iwrr
+
+import (
+ "math/rand"
+ "sync"
+
+ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+)
+
+type iwrrEntry struct {
+ weight int64
+ invoker protocol.Invoker
+
+ next *iwrrEntry
+}
+
+type iwrrQueue struct {
+ head *iwrrEntry
+ tail *iwrrEntry
+}
+
+func NewIwrrQueue() *iwrrQueue {
+ return &iwrrQueue{}
+}
+
+func (item *iwrrQueue) push(entry *iwrrEntry) {
+ entry.next = nil
+ tail := item.tail
+ item.tail = entry
+ if tail == nil {
+ item.head = entry
+ } else {
+ tail.next = entry
+ }
+}
+
+func (item *iwrrQueue) pop() *iwrrEntry {
+ head := item.head
+ next := head.next
+ head.next = nil
+ item.head = next
+ if next == nil {
+ item.tail = nil
+ }
+ return head
+}
+
+func (item *iwrrQueue) empty() bool {
+ return item.head == nil
+}
+
+// InterleavedweightedRoundRobin struct
+type interleavedweightedRoundRobin struct {
+ current *iwrrQueue
+ next *iwrrQueue
+ step int64
+ mu sync.Mutex
+}
+
+func NewInterleavedweightedRoundRobin(invokers []protocol.Invoker, invocation
protocol.Invocation) *interleavedweightedRoundRobin {
+ iwrrp := new(interleavedweightedRoundRobin)
+ iwrrp.current = NewIwrrQueue()
+ iwrrp.next = NewIwrrQueue()
+
+ size := uint64(len(invokers))
+ offset := rand.Uint64() % size
+ step := int64(0)
+ for idx := uint64(0); idx < size; idx++ {
+ invoker := invokers[(idx+offset)%size]
+ weight := loadbalance.GetWeight(invoker, invocation)
+ step = gcdInt(step, weight)
+ iwrrp.current.push(&iwrrEntry{
+ invoker: invoker,
+ weight: weight,
+ })
+ }
+ iwrrp.step = step
+
+ return iwrrp
+}
+
+func (iwrr *interleavedweightedRoundRobin) Pick(invocation
protocol.Invocation) protocol.Invoker {
+ iwrr.mu.Lock()
+ defer iwrr.mu.Unlock()
+
+ if iwrr.current.empty() {
+ iwrr.current, iwrr.next = iwrr.next, iwrr.current
+ }
+
+ entry := iwrr.current.pop()
+ entry.weight -= iwrr.step
+
+ if entry.weight > 0 {
+ iwrr.current.push(entry)
+ } else {
+ weight := loadbalance.GetWeight(entry.invoker, invocation)
+ if weight < 0 {
+ weight = 0
+ }
+ entry.weight = weight
+ iwrr.next.push(entry)
+ }
+
+ return entry.invoker
+}
+
+func gcdInt(a, b int64) int64 {
+ for b != 0 {
+ a, b = b, a%b
+ }
+ return a
+}
diff --git a/cluster/loadbalance/interleavedweightedroundrobin/loadbalance.go
b/cluster/loadbalance/interleavedweightedroundrobin/loadbalance.go
new file mode 100644
index 000000000..20f9c3c20
--- /dev/null
+++ b/cluster/loadbalance/interleavedweightedroundrobin/loadbalance.go
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package iwrr
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/extension"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+)
+
+func init() {
+
extension.SetLoadbalance(constant.LoadBalanceKeyInterleavedWeightedRoundRobin,
newInterleavedWeightedRoundRobinBalance)
+}
+
+type interleavedWeightedRoundRobinBalance struct{}
+
+// newInterleavedWeightedRoundRobinBalance returns a interleaved weighted
round robin load balance.
+func newInterleavedWeightedRoundRobinBalance() loadbalance.LoadBalance {
+ return &interleavedWeightedRoundRobinBalance{}
+}
+
+// Select gets invoker based on interleaved weighted round robine load
balancing strategy
+func (lb *interleavedWeightedRoundRobinBalance) Select(invokers
[]protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
+ count := len(invokers)
+ if count == 0 {
+ return nil
+ }
+ if count == 1 {
+ return invokers[0]
+ }
+
+ iwrrp := NewInterleavedweightedRoundRobin(invokers, invocation)
+ return iwrrp.Pick(invocation)
+}
diff --git
a/cluster/loadbalance/interleavedweightedroundrobin/loadbalance_test.go
b/cluster/loadbalance/interleavedweightedroundrobin/loadbalance_test.go
new file mode 100644
index 000000000..9e926adf3
--- /dev/null
+++ b/cluster/loadbalance/interleavedweightedroundrobin/loadbalance_test.go
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package iwrr
+
+import (
+ "fmt"
+ "testing"
+
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestIWrrRoundRobinSelect(t *testing.T) {
+ loadBalance := newInterleavedWeightedRoundRobinBalance()
+
+ var invokers []protocol.Invoker
+
+ url, _ :=
common.NewURL(fmt.Sprintf("dubbo://%s:%d/org.apache.demo.HelloService",
+ constant.LocalHostValue, constant.DefaultPort))
+ invokers = append(invokers, protocol.NewBaseInvoker(url))
+ i := loadBalance.Select(invokers, &invocation.RPCInvocation{})
+ assert.True(t, i.GetURL().URLEqual(url))
+
+ for i := 1; i < 10; i++ {
+ url, _ :=
common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService",
i))
+ invokers = append(invokers, protocol.NewBaseInvoker(url))
+ }
+ loadBalance.Select(invokers, &invocation.RPCInvocation{})
+}
+
+func TestIWrrRoundRobinByWeight(t *testing.T) {
+ loadBalance := newInterleavedWeightedRoundRobinBalance()
+
+ var invokers []protocol.Invoker
+ loop := 10
+ for i := 1; i <= loop; i++ {
+ url, _ :=
common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService?weight=%v",
i, i))
+ invokers = append(invokers, protocol.NewBaseInvoker(url))
+ }
+
+ loop = (1 + loop) * loop / 2
+ selected := make(map[protocol.Invoker]int)
+
+ for i := 1; i <= loop; i++ {
+ invoker := loadBalance.Select(invokers,
&invocation.RPCInvocation{})
+ selected[invoker]++
+ }
+
+ sum := 0
+ for _, value := range selected {
+ sum += value
+ }
+
+ assert.Equal(t, loop, sum)
+}
diff --git a/cluster/loadbalance/loadbalance_benchmarks_test.go
b/cluster/loadbalance/loadbalance_benchmarks_test.go
new file mode 100644
index 000000000..c1d27b7de
--- /dev/null
+++ b/cluster/loadbalance/loadbalance_benchmarks_test.go
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package loadbalance_test
+
+import (
+ "fmt"
+ "testing"
+
+ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
+ _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/consistenthashing"
+ _
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/interleavedweightedroundrobin"
+ _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/leastactive"
+ _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/p2c"
+ _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random"
+ _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/ringhash"
+ _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/roundrobin"
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/extension"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
+)
+
+func Generate() []protocol.Invoker {
+ var invokers []protocol.Invoker
+ for i := 1; i < 256; i++ {
+ url, _ :=
common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService",
i))
+ invokers = append(invokers, protocol.NewBaseInvoker(url))
+ }
+ return invokers
+}
+
+func Benchloadbalace(b *testing.B, lb loadbalance.LoadBalance) {
+ b.Helper()
+ invokers := Generate()
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ lb.Select(invokers, &invocation.RPCInvocation{})
+ }
+}
+
+func BenchmarkRoudrobinLoadbalace(b *testing.B) {
+ Benchloadbalace(b,
extension.GetLoadbalance(constant.LoadBalanceKeyRoundRobin))
+}
+
+func BenchmarkLeastativeLoadbalace(b *testing.B) {
+ Benchloadbalace(b,
extension.GetLoadbalance(constant.LoadBalanceKeyLeastActive))
+}
+
+func BenchmarkConsistenthashingLoadbalace(b *testing.B) {
+ Benchloadbalace(b,
extension.GetLoadbalance(constant.LoadBalanceKeyConsistentHashing))
+}
+
+func BenchmarkP2CLoadbalace(b *testing.B) {
+ Benchloadbalace(b, extension.GetLoadbalance(constant.LoadBalanceKeyP2C))
+}
+
+func BenchmarkInterleavedWeightedRoundRobinLoadbalace(b *testing.B) {
+ Benchloadbalace(b,
extension.GetLoadbalance(constant.LoadBalanceKeyInterleavedWeightedRoundRobin))
+}
+
+func BenchmarkRandomLoadbalace(b *testing.B) {
+ Benchloadbalace(b,
extension.GetLoadbalance(constant.LoadBalanceKeyRandom))
+}
diff --git a/common/constant/loadbalance.go b/common/constant/loadbalance.go
index dde844337..09af051b4 100644
--- a/common/constant/loadbalance.go
+++ b/common/constant/loadbalance.go
@@ -18,10 +18,11 @@
package constant
const (
- LoadBalanceKeyConsistentHashing = "consistenthashing"
- LoadBalanceKeyLeastActive = "leastactive"
- LoadBalanceKeyRandom = "random"
- LoadBalanceKeyRoundRobin = "roundrobin"
- LoadBalanceKeyP2C = "p2c"
- LoadXDSRingHash = "xdsringhash"
+ LoadBalanceKeyConsistentHashing = "consistenthashing"
+ LoadBalanceKeyLeastActive = "leastactive"
+ LoadBalanceKeyRandom = "random"
+ LoadBalanceKeyRoundRobin = "roundrobin"
+ LoadBalanceKeyP2C = "p2c"
+ LoadXDSRingHash = "xdsringhash"
+ LoadBalanceKeyInterleavedWeightedRoundRobin =
"interleavedweightedroundrobin"
)