This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 884475f [ISSUE #95] Support multiple NameServe (#96)
884475f is described below
commit 884475fd06b71942f8463074cdb2cdb8a94a5542
Author: xujianhai666 <[email protected]>
AuthorDate: Mon Jul 8 14:19:28 2019 +0800
[ISSUE #95] Support multiple NameServe (#96)
* fix message properties. fix #65
* simplify commit
* simplify commit
* Support multiple NameServer. fix #95
* add retry on queryTopicRouteInfoFromServer
* move pkg to kernel
---
examples/consumer/interceptor/main.go | 2 +-
examples/consumer/simple/main.go | 2 +-
examples/producer/interceptor/main.go | 2 +-
examples/producer/simple/main.go | 2 +-
go.mod | 1 +
go.sum | 3 +
internal/consumer/push_consumer.go | 19 ++----
internal/kernel/namesrv.go | 107 ++++++++++++++++++++++++++++++++++
internal/kernel/namesrv_test.go | 63 ++++++++++++++++++++
internal/kernel/route.go | 24 ++++++--
internal/producer/producer.go | 21 ++-----
primitive/options.go | 6 +-
utils/errors.go | 4 +-
utils/string.go | 21 -------
utils/string_test.go | 30 +---------
15 files changed, 217 insertions(+), 90 deletions(-)
diff --git a/examples/consumer/interceptor/main.go
b/examples/consumer/interceptor/main.go
index 01fedc3..9db45a8 100644
--- a/examples/consumer/interceptor/main.go
+++ b/examples/consumer/interceptor/main.go
@@ -28,7 +28,7 @@ import (
)
func main() {
- c, _ := consumer.NewPushConsumer("testGroup", "127.0.0.1:9876",
+ c, _ := consumer.NewPushConsumer("testGroup",
[]string{"127.0.0.1:9876"},
primitive.WithConsumerModel(primitive.Clustering),
primitive.WithConsumeFromWhere(primitive.ConsumeFromFirstOffset),
primitive.WithChainConsumerInterceptor(UserFistInterceptor(),
UserSecondInterceptor()))
diff --git a/examples/consumer/simple/main.go b/examples/consumer/simple/main.go
index 70bbbd4..3bcf702 100644
--- a/examples/consumer/simple/main.go
+++ b/examples/consumer/simple/main.go
@@ -27,7 +27,7 @@ import (
)
func main() {
- c, _ := consumer.NewPushConsumer("testGroup", "127.0.0.1:9876")
+ c, _ := consumer.NewPushConsumer("testGroup",
[]string{"127.0.0.1:9876"})
err := c.Subscribe("TopicTest", primitive.MessageSelector{}, func(ctx
*primitive.ConsumeMessageContext,
msgs []*primitive.MessageExt) (primitive.ConsumeResult, error) {
fmt.Println("subscribe callback: %v", msgs)
diff --git a/examples/producer/interceptor/main.go
b/examples/producer/interceptor/main.go
index c70eab3..a81a181 100644
--- a/examples/producer/interceptor/main.go
+++ b/examples/producer/interceptor/main.go
@@ -29,7 +29,7 @@ import (
)
func main() {
- nameServerAddr := "127.0.0.1:9876"
+ nameServerAddr := []string{"127.0.0.1:9876"}
p, _ := producer.NewProducer(nameServerAddr, primitive.WithRetry(2),
primitive.WithChainProducerInterceptor(UserFirstInterceptor(),
UserSecondInterceptor()))
err := p.Start()
diff --git a/examples/producer/simple/main.go b/examples/producer/simple/main.go
index 39c885d..542ec58 100644
--- a/examples/producer/simple/main.go
+++ b/examples/producer/simple/main.go
@@ -28,7 +28,7 @@ import (
// Package main implements a simple producer to send message.
func main() {
- nameServerAddr := "127.0.0.1:9876"
+ nameServerAddr := []string{"127.0.0.1:9876"}
p, _ := producer.NewProducer(nameServerAddr, primitive.WithRetry(2))
err := p.Start()
if err != nil {
diff --git a/go.mod b/go.mod
index f8a9967..6fd011e 100644
--- a/go.mod
+++ b/go.mod
@@ -4,6 +4,7 @@ go 1.12
require (
github.com/emirpasic/gods v1.12.0
+ github.com/pkg/errors v0.8.1
github.com/sirupsen/logrus v1.4.1
github.com/stretchr/testify v1.3.0
github.com/tidwall/gjson v1.2.1
diff --git a/go.sum b/go.sum
index 6580f88..cf90957 100644
--- a/go.sum
+++ b/go.sum
@@ -3,7 +3,10 @@ github.com/davecgh/go-spew v1.1.1
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emirpasic/gods v1.12.0
h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
github.com/emirpasic/gods v1.12.0/go.mod
h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
+github.com/pkg/errors v0.8.1/go.mod
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.4.1
h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
diff --git a/internal/consumer/push_consumer.go
b/internal/consumer/push_consumer.go
index 8acbc4c..d78f428 100644
--- a/internal/consumer/push_consumer.go
+++ b/internal/consumer/push_consumer.go
@@ -19,17 +19,15 @@ package consumer
import (
"context"
- "errors"
"fmt"
"math"
- "os"
"strconv"
"time"
"github.com/apache/rocketmq-client-go/internal/kernel"
"github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
- "github.com/apache/rocketmq-client-go/utils"
+ "github.com/pkg/errors"
)
// In most scenarios, this is the mostly recommended usage to consume messages.
@@ -63,24 +61,19 @@ type pushConsumer struct {
interceptor primitive.CInterceptor
}
-func NewPushConsumer(consumerGroup string, nameServerAddr string, opts
...*primitive.ConsumerOption) (PushConsumer, error) {
- if err := utils.VerifyIP(nameServerAddr); err != nil {
- return nil, err
- }
- if nameServerAddr == "" {
- rlog.Fatal("opts.NameServerAddr can't be empty")
- }
- err := os.Setenv(kernel.EnvNameServerAddr, nameServerAddr)
+func NewPushConsumer(consumerGroup string, nameServerAddrs []string, opts
...*primitive.ConsumerOption) (PushConsumer, error) {
+ srvs, err := kernel.NewNamesrv(nameServerAddrs...)
if err != nil {
- rlog.Fatal("set env=EnvNameServerAddr error: %s ", err.Error())
+ return nil, errors.Wrap(err, "new Namesrv failed.")
}
+ kernel.RegisterNamsrv(srvs)
pushOpts := primitive.DefaultPushConsumerOptions()
for _, op := range opts {
op.Apply(&pushOpts)
}
- pushOpts.NameServerAddr = nameServerAddr
+ pushOpts.NameServerAddrs = nameServerAddrs
dc := &defaultConsumer{
consumerGroup: consumerGroup,
diff --git a/internal/kernel/namesrv.go b/internal/kernel/namesrv.go
new file mode 100644
index 0000000..7f29a8b
--- /dev/null
+++ b/internal/kernel/namesrv.go
@@ -0,0 +1,107 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+import (
+ "errors"
+ "regexp"
+ "strings"
+ "sync"
+)
+
+var (
+ ipRegex, _ =
regexp.Compile(`^((25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))\.){3}(25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))`)
+
+ ErrNoNameserver = errors.New("nameServerAddrs can't be empty.")
+ ErrMultiIP = errors.New("multiple IP addr does not support")
+ ErrIllegalIP = errors.New("IP addr error")
+)
+
+// Namesrvs rocketmq namesrv instance.
+type Namesrvs struct {
+ // namesrv addr list
+ srvs []string
+
+ // lock for getNamesrv in case of update index race condition
+ lock sync.Locker
+
+ // index indicate the next position for getNamesrv
+ index int
+}
+
+// NewNamesrv init Namesrv from namesrv addr string.
+func NewNamesrv(s ...string) (*Namesrvs, error) {
+ if len(s) == 0 {
+ return nil, ErrNoNameserver
+ }
+
+ ss := s
+ if len(ss) == 1 {
+ // compatible with multi server env string: "a;b;c"
+ ss = strings.Split(s[0], ";")
+ }
+
+ for _, srv := range ss {
+ if err := verifyIP(srv); err != nil {
+ return nil, err
+ }
+ }
+
+ return &Namesrvs{
+ srvs: ss,
+ lock: new(sync.Mutex),
+ }, nil
+}
+
+// GetNamesrv return namesrv using round-robin strategy.
+func (s *Namesrvs) GetNamesrv() string {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ addr := s.srvs[s.index]
+ index := s.index + 1
+ if index < 0 {
+ index = -index
+ }
+ index %= len(s.srvs)
+ s.index = index
+ return addr
+}
+
+func (s *Namesrvs) Size() int {
+ return len(s.srvs)
+}
+
+func (s *Namesrvs) String() string {
+ return strings.Join(s.srvs, ";")
+}
+
+func verifyIP(ip string) error {
+ if strings.Contains(ip, ";") {
+ return ErrMultiIP
+ }
+ ips := ipRegex.FindAllString(ip, -1)
+ if len(ips) == 0 {
+ return ErrIllegalIP
+ }
+
+ if len(ips) > 1 {
+ return ErrMultiIP
+ }
+ return nil
+}
diff --git a/internal/kernel/namesrv_test.go b/internal/kernel/namesrv_test.go
new file mode 100644
index 0000000..accae0a
--- /dev/null
+++ b/internal/kernel/namesrv_test.go
@@ -0,0 +1,63 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestVerifyIP(t *testing.T) {
+ IPs := "127.0.0.1:9876"
+ err := verifyIP(IPs)
+ assert.Nil(t, err)
+
+ IPs = "12.24.123.243:10911"
+ err = verifyIP(IPs)
+ assert.Nil(t, err)
+
+ IPs = "xa2.0.0.1:9876"
+ err = verifyIP(IPs)
+ assert.Equal(t, "IP addr error", err.Error())
+
+ IPs = "333.0.0.1:9876"
+ err = verifyIP(IPs)
+ assert.Equal(t, "IP addr error", err.Error())
+
+ IPs = "127.0.0.1:9876;12.24.123.243:10911"
+ err = verifyIP(IPs)
+ assert.Equal(t, "multiple IP addr does not support", err.Error())
+}
+
+// TestSelector test roundrobin selector in namesrv
+func TestSelector(t *testing.T) {
+ srvs := []string{"127.0.0.1:9876", "127.0.0.1:9879",
"12.24.123.243:10911", "12.24.123.243:10915"}
+ namesrv, err := NewNamesrv(srvs...)
+ assert.Nil(t, err)
+
+ assert.Equal(t, srvs[0], namesrv.GetNamesrv())
+ assert.Equal(t, srvs[1], namesrv.GetNamesrv())
+ assert.Equal(t, srvs[2], namesrv.GetNamesrv())
+ assert.Equal(t, srvs[3], namesrv.GetNamesrv())
+ assert.Equal(t, srvs[0], namesrv.GetNamesrv())
+ assert.Equal(t, srvs[1], namesrv.GetNamesrv())
+ assert.Equal(t, srvs[2], namesrv.GetNamesrv())
+ assert.Equal(t, srvs[3], namesrv.GetNamesrv())
+ assert.Equal(t, srvs[0], namesrv.GetNamesrv())
+}
diff --git a/internal/kernel/route.go b/internal/kernel/route.go
index 416d7b8..6e012c5 100644
--- a/internal/kernel/route.go
+++ b/internal/kernel/route.go
@@ -21,7 +21,6 @@ import (
"encoding/json"
"errors"
"math/rand"
- "os"
"sort"
"strconv"
"strings"
@@ -48,8 +47,14 @@ const (
var (
ErrTopicNotExist = errors.New("topic not exist")
nameSrvClient = remote.NewRemotingClient()
+
+ nameSrvs *Namesrvs
)
+func RegisterNamsrv(s *Namesrvs) {
+ nameSrvs = s
+}
+
var (
// brokerName -> *BrokerData
brokerAddressesMap sync.Map
@@ -261,10 +266,21 @@ func queryTopicRouteInfoFromServer(topic string)
(*TopicRouteData, error) {
request := &GetRouteInfoRequest{
Topic: topic,
}
- rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
- response, err := nameSrvClient.InvokeSync(getNameServerAddress(), rc,
requestTimeout)
+ var (
+ response *remote.RemotingCommand
+ err error
+ )
+ for i := 0; i < nameSrvs.Size(); i++ {
+ rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic,
request, nil)
+ response, err =
nameSrvClient.InvokeSync(getNameServerAddress(), rc, requestTimeout)
+
+ if err != nil {
+ continue
+ }
+ }
if err != nil {
+ rlog.Errorf("connect to namesrv: %v failed.", nameSrvs)
return nil, err
}
@@ -372,7 +388,7 @@ func routeData2PublishInfo(topic string, data
*TopicRouteData) *TopicPublishInfo
}
func getNameServerAddress() string {
- return os.Getenv(EnvNameServerAddr)
+ return nameSrvs.GetNamesrv()
}
// TopicRouteData TopicRouteData
diff --git a/internal/producer/producer.go b/internal/producer/producer.go
index a559a98..5be553e 100644
--- a/internal/producer/producer.go
+++ b/internal/producer/producer.go
@@ -19,9 +19,7 @@ package producer
import (
"context"
- "errors"
"fmt"
- "os"
"sync"
"sync/atomic"
"time"
@@ -30,7 +28,7 @@ import (
"github.com/apache/rocketmq-client-go/internal/remote"
"github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
- "github.com/apache/rocketmq-client-go/utils"
+ "github.com/pkg/errors"
)
type Producer interface {
@@ -40,24 +38,18 @@ type Producer interface {
SendOneWay(context.Context, *primitive.Message) error
}
-func NewProducer(nameServerAddr string, opts ...*primitive.ProducerOption)
(Producer, error) {
- if err := utils.VerifyIP(nameServerAddr); err != nil {
- return nil, err
- }
-
- if nameServerAddr == "" {
- rlog.Fatal("nameServerAddr can't be empty")
- }
- err := os.Setenv(kernel.EnvNameServerAddr, nameServerAddr)
+func NewProducer(nameServerAddrs []string, opts ...*primitive.ProducerOption)
(Producer, error) {
+ srvs, err := kernel.NewNamesrv(nameServerAddrs...)
if err != nil {
- rlog.Fatal("set env=EnvNameServerAddr error: %s ", err.Error())
+ return nil, errors.Wrap(err, "new Namesrv failed.")
}
+ kernel.RegisterNamsrv(srvs)
popts := primitive.DefaultProducerOptions()
for _, opt := range opts {
opt.Apply(&popts)
}
- popts.NameServerAddr = nameServerAddr
+ popts.NameServerAddrs = nameServerAddrs
producer := &defaultProducer{
group: "default",
@@ -289,4 +281,3 @@ func (p *defaultProducer) IsPublishTopicNeedUpdate(topic
string) bool {
func (p *defaultProducer) IsUnitMode() bool {
return false
}
-
diff --git a/primitive/options.go b/primitive/options.go
index 0e40415..2cbf63c 100644
--- a/primitive/options.go
+++ b/primitive/options.go
@@ -30,7 +30,7 @@ type ProducerOptions struct {
Interceptors []PInterceptor
ClientOption
- NameServerAddr string
+ NameServerAddrs []string
GroupName string
RetryTimesWhenSendFailed int
UnitMode bool
@@ -79,7 +79,7 @@ func WithRetry(retries int) *ProducerOption {
type ConsumerOptions struct {
ClientOption
- NameServerAddr string
+ NameServerAddrs []string
/**
* Backtracking consumption time with second precision. Time format is
@@ -217,7 +217,7 @@ func (opt *ClientOption) String() string {
}
type ClientOption struct {
- NameServerAddr string
+ NameServerAddrs string
ClientIP string
InstanceName string
UnitMode bool
diff --git a/utils/errors.go b/utils/errors.go
index 0d96d97..a3c7ead 100644
--- a/utils/errors.go
+++ b/utils/errors.go
@@ -17,7 +17,9 @@ limitations under the License.
package utils
-import "github.com/apache/rocketmq-client-go/rlog"
+import (
+ "github.com/apache/rocketmq-client-go/rlog"
+)
func CheckError(action string, err error) {
if err != nil {
diff --git a/utils/string.go b/utils/string.go
index 427a8a5..a1f9950 100644
--- a/utils/string.go
+++ b/utils/string.go
@@ -18,14 +18,7 @@ limitations under the License.
package utils
import (
- "errors"
"fmt"
- "regexp"
- "strings"
-)
-
-var (
- ipRegex, _ =
regexp.Compile(`^((25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))\.){3}(25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))`)
)
// HashString hashes a string to a unique hashcode.
@@ -48,17 +41,3 @@ func StrJoin(str, key string, value interface{}) string {
return str + key + ": " + fmt.Sprint(value) + ", "
}
-func VerifyIP(ip string) error {
- if strings.Contains(ip, ";") {
- return errors.New("multiple IP addr does not support")
- }
- ips := ipRegex.FindAllString(ip, -1)
- if len(ips) == 0 {
- return errors.New("IP addr error")
- }
-
- if len(ips) > 1 {
- return errors.New("multiple IP addr does not support")
- }
- return nil
-}
diff --git a/utils/string_test.go b/utils/string_test.go
index e0a9f70..5b55d23 100644
--- a/utils/string_test.go
+++ b/utils/string_test.go
@@ -1,29 +1 @@
-package utils
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestVerifyIP(t *testing.T) {
- IPs := "127.0.0.1:9876"
- err := VerifyIP(IPs)
- assert.Nil(t, err)
-
- IPs = "12.24.123.243:10911"
- err = VerifyIP(IPs)
- assert.Nil(t, err)
-
- IPs = "xa2.0.0.1:9876"
- err = VerifyIP(IPs)
- assert.Equal(t, "IP addr error", err.Error())
-
- IPs = "333.0.0.1:9876"
- err = VerifyIP(IPs)
- assert.Equal(t, "IP addr error", err.Error())
-
- IPs = "127.0.0.1:9876;12.24.123.243:10911"
- err = VerifyIP(IPs)
- assert.Equal(t, "multiple IP addr does not support", err.Error())
-}
+package utils
\ No newline at end of file