This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 80d9c07e [ISSUE #679] Fix judgment of topic route equality and
optimize loadBalancer (#680)
80d9c07e is described below
commit 80d9c07e3df703936467bcfc6a25f5c6a9a519a5
Author: Liu Shengzhong <[email protected]>
AuthorDate: Mon Feb 26 10:36:57 2024 +0800
[ISSUE #679] Fix judgment of topic route equality and optimize loadBalancer
(#680)
* fix(golang): correct judgment of topic route equality and optimize load
balancer
* fix(golang): add tests for routeEqual
* refactor
---
golang/client.go | 46 ++++++++++++++++++++++++++++++++++++----------
golang/client_test.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++
golang/loadBalancer.go | 16 ++++++++++++++++
3 files changed, 98 insertions(+), 10 deletions(-)
diff --git a/golang/client.go b/golang/client.go
index 45e4b549..71c48198 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -24,7 +24,6 @@ import (
"encoding/hex"
"errors"
"fmt"
- "reflect"
"sync"
"time"
@@ -32,6 +31,7 @@ import (
"github.com/apache/rocketmq-clients/golang/v5/pkg/ticker"
"github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
+ "github.com/golang/protobuf/proto"
"github.com/google/uuid"
"go.uber.org/atomic"
"go.uber.org/zap"
@@ -504,27 +504,40 @@ func (cli *defaultClient) startUp() error {
f := func() {
cli.router.Range(func(k, v interface{}) bool {
topic := k.(string)
- oldRoute := v
newRoute, err := cli.queryRoute(context.TODO(), topic,
cli.opts.timeout)
if err != nil {
cli.log.Errorf("scheduled queryRoute err=%v",
err)
}
- if newRoute == nil && oldRoute != nil {
+ if newRoute == nil && v != nil {
cli.log.Info("newRoute is nil, but oldRoute is
not. do not update")
return true
}
- if !reflect.DeepEqual(newRoute, oldRoute) {
+ var oldRoute []*v2.MessageQueue
+ if v != nil {
+ oldRoute = v.([]*v2.MessageQueue)
+ }
+ if !routeEqual(oldRoute, newRoute) {
cli.router.Store(k, newRoute)
switch impl := cli.clientImpl.(type) {
case *defaultProducer:
- plb, err :=
NewPublishingLoadBalancer(newRoute)
- if err == nil {
-
impl.publishingRouteDataResultCache.Store(topic, plb)
+ existing, ok :=
impl.publishingRouteDataResultCache.Load(topic)
+ if !ok {
+ plb, err :=
NewPublishingLoadBalancer(newRoute)
+ if err == nil {
+
impl.publishingRouteDataResultCache.Store(topic, plb)
+ }
+ } else {
+
impl.publishingRouteDataResultCache.Store(topic,
existing.(PublishingLoadBalancer).CopyAndUpdate(newRoute))
}
case *defaultSimpleConsumer:
- slb, err :=
NewSubscriptionLoadBalancer(newRoute)
- if err == nil {
-
impl.subTopicRouteDataResultCache.Store(topic, slb)
+ existing, ok :=
impl.subTopicRouteDataResultCache.Load(topic)
+ if !ok {
+ slb, err :=
NewSubscriptionLoadBalancer(newRoute)
+ if err == nil {
+
impl.subTopicRouteDataResultCache.Store(topic, slb)
+ }
+ } else {
+
impl.subTopicRouteDataResultCache.Store(topic,
existing.(SubscriptionLoadBalancer).CopyAndUpdate(newRoute))
}
}
}
@@ -534,6 +547,19 @@ func (cli *defaultClient) startUp() error {
ticker.Tick(f, time.Second*30, cli.done)
return nil
}
+
+func routeEqual(old, new []*v2.MessageQueue) bool {
+ if len(old) != len(new) {
+ return false
+ }
+ for i := 0; i < len(old); i++ {
+ if !proto.Equal(old[i], new[i]) {
+ return false
+ }
+ }
+ return true
+}
+
func (cli *defaultClient) notifyClientTermination() {
cli.log.Info("start notifyClientTermination")
ctx := cli.Sign(context.Background())
diff --git a/golang/client_test.go b/golang/client_test.go
index b46d2d73..4549bdfe 100644
--- a/golang/client_test.go
+++ b/golang/client_test.go
@@ -20,6 +20,7 @@ package golang
import (
"context"
"fmt"
+ "reflect"
"testing"
"time"
@@ -293,3 +294,48 @@ func TestRestoreDefaultClientSessionTwoErrors(t
*testing.T) {
assert.Equal(t, "Encountered error while receiving TelemetryCommand,
trying to recover", commandExecutionLog[0].Message)
assert.Equal(t, "Failed to recover, err=EOF",
commandExecutionLog[1].Message)
}
+
+func Test_routeEqual(t *testing.T) {
+ oldMq := &v2.MessageQueue{
+ Topic: &v2.Resource{
+ Name: "topic-test",
+ },
+ Id: 0,
+ Permission: v2.Permission_READ_WRITE,
+ Broker: &v2.Broker{
+ Name: "broker-test",
+ Id: 0,
+ Endpoints: fakeEndpoints(),
+ },
+ AcceptMessageTypes: []v2.MessageType{
+ v2.MessageType_NORMAL,
+ },
+ }
+ newMq := &v2.MessageQueue{
+ Topic: &v2.Resource{
+ Name: "topic-test",
+ },
+ Id: 0,
+ Permission: v2.Permission_READ_WRITE,
+ Broker: &v2.Broker{
+ Name: "broker-test",
+ Id: 0,
+ Endpoints: fakeEndpoints(),
+ },
+ AcceptMessageTypes: []v2.MessageType{
+ v2.MessageType_NORMAL,
+ },
+ }
+
+ newMq.ProtoReflect() // message internal field value will be changed
+
+ oldRoute := []*v2.MessageQueue{oldMq}
+ newRoute := []*v2.MessageQueue{newMq}
+
+ assert.Equal(t, false, reflect.DeepEqual(oldRoute, newRoute))
+ assert.Equal(t, true, routeEqual(oldRoute, newRoute))
+ assert.Equal(t, true, routeEqual(nil, nil))
+ assert.Equal(t, false, routeEqual(nil, newRoute))
+ assert.Equal(t, false, routeEqual(oldRoute, nil))
+ assert.Equal(t, true, routeEqual(nil, []*v2.MessageQueue{}))
+}
diff --git a/golang/loadBalancer.go b/golang/loadBalancer.go
index db9cbd44..da2dd647 100644
--- a/golang/loadBalancer.go
+++ b/golang/loadBalancer.go
@@ -31,6 +31,7 @@ import (
type PublishingLoadBalancer interface {
TakeMessageQueueByMessageGroup(messageGroup *string)
([]*v2.MessageQueue, error)
TakeMessageQueues(excluded sync.Map, count int) ([]*v2.MessageQueue,
error)
+ CopyAndUpdate([]*v2.MessageQueue) PublishingLoadBalancer
}
type publishingLoadBalancer struct {
@@ -119,8 +120,16 @@ func (plb *publishingLoadBalancer)
TakeMessageQueues(excluded sync.Map, count in
return candidates, nil
}
+func (plb *publishingLoadBalancer) CopyAndUpdate(messageQueues
[]*v2.MessageQueue) PublishingLoadBalancer {
+ return &publishingLoadBalancer{
+ messageQueues: messageQueues,
+ index: plb.index,
+ }
+}
+
type SubscriptionLoadBalancer interface {
TakeMessageQueue() (*v2.MessageQueue, error)
+ CopyAndUpdate([]*v2.MessageQueue) SubscriptionLoadBalancer
}
type subscriptionLoadBalancer struct {
@@ -147,3 +156,10 @@ func (slb *subscriptionLoadBalancer) TakeMessageQueue()
(*v2.MessageQueue, error
selectMessageQueue := slb.messageQueues[idx]
return selectMessageQueue, nil
}
+
+func (slb *subscriptionLoadBalancer) CopyAndUpdate(messageQueues
[]*v2.MessageQueue) SubscriptionLoadBalancer {
+ return &subscriptionLoadBalancer{
+ messageQueues: messageQueues,
+ index: slb.index,
+ }
+}