This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 884475f  [ISSUE #95] Support multiple NameServe (#96)
884475f is described below

commit 884475fd06b71942f8463074cdb2cdb8a94a5542
Author: xujianhai666 <[email protected]>
AuthorDate: Mon Jul 8 14:19:28 2019 +0800

    [ISSUE #95] Support multiple NameServe (#96)
    
    * fix message properties. fix #65
    
    * simplify commit
    
    * simplify commit
    
    * Support multiple NameServer. fix #95
    
    * add retry on queryTopicRouteInfoFromServer
    
    * move pkg to kernel
---
 examples/consumer/interceptor/main.go |   2 +-
 examples/consumer/simple/main.go      |   2 +-
 examples/producer/interceptor/main.go |   2 +-
 examples/producer/simple/main.go      |   2 +-
 go.mod                                |   1 +
 go.sum                                |   3 +
 internal/consumer/push_consumer.go    |  19 ++----
 internal/kernel/namesrv.go            | 107 ++++++++++++++++++++++++++++++++++
 internal/kernel/namesrv_test.go       |  63 ++++++++++++++++++++
 internal/kernel/route.go              |  24 ++++++--
 internal/producer/producer.go         |  21 ++-----
 primitive/options.go                  |   6 +-
 utils/errors.go                       |   4 +-
 utils/string.go                       |  21 -------
 utils/string_test.go                  |  30 +---------
 15 files changed, 217 insertions(+), 90 deletions(-)

diff --git a/examples/consumer/interceptor/main.go 
b/examples/consumer/interceptor/main.go
index 01fedc3..9db45a8 100644
--- a/examples/consumer/interceptor/main.go
+++ b/examples/consumer/interceptor/main.go
@@ -28,7 +28,7 @@ import (
 )
 
 func main() {
-       c, _ := consumer.NewPushConsumer("testGroup", "127.0.0.1:9876",
+       c, _ := consumer.NewPushConsumer("testGroup", 
[]string{"127.0.0.1:9876"},
                primitive.WithConsumerModel(primitive.Clustering),
                
primitive.WithConsumeFromWhere(primitive.ConsumeFromFirstOffset),
                primitive.WithChainConsumerInterceptor(UserFistInterceptor(), 
UserSecondInterceptor()))
diff --git a/examples/consumer/simple/main.go b/examples/consumer/simple/main.go
index 70bbbd4..3bcf702 100644
--- a/examples/consumer/simple/main.go
+++ b/examples/consumer/simple/main.go
@@ -27,7 +27,7 @@ import (
 )
 
 func main() {
-       c, _ := consumer.NewPushConsumer("testGroup", "127.0.0.1:9876")
+       c, _ := consumer.NewPushConsumer("testGroup", 
[]string{"127.0.0.1:9876"})
        err := c.Subscribe("TopicTest", primitive.MessageSelector{}, func(ctx 
*primitive.ConsumeMessageContext,
                msgs []*primitive.MessageExt) (primitive.ConsumeResult, error) {
                fmt.Println("subscribe callback: %v", msgs)
diff --git a/examples/producer/interceptor/main.go 
b/examples/producer/interceptor/main.go
index c70eab3..a81a181 100644
--- a/examples/producer/interceptor/main.go
+++ b/examples/producer/interceptor/main.go
@@ -29,7 +29,7 @@ import (
 )
 
 func main() {
-       nameServerAddr := "127.0.0.1:9876"
+       nameServerAddr := []string{"127.0.0.1:9876"}
        p, _ := producer.NewProducer(nameServerAddr, primitive.WithRetry(2),
                primitive.WithChainProducerInterceptor(UserFirstInterceptor(), 
UserSecondInterceptor()))
        err := p.Start()
diff --git a/examples/producer/simple/main.go b/examples/producer/simple/main.go
index 39c885d..542ec58 100644
--- a/examples/producer/simple/main.go
+++ b/examples/producer/simple/main.go
@@ -28,7 +28,7 @@ import (
 
 // Package main implements a simple producer to send message.
 func main() {
-       nameServerAddr := "127.0.0.1:9876"
+       nameServerAddr := []string{"127.0.0.1:9876"}
        p, _ := producer.NewProducer(nameServerAddr, primitive.WithRetry(2))
        err := p.Start()
        if err != nil {
diff --git a/go.mod b/go.mod
index f8a9967..6fd011e 100644
--- a/go.mod
+++ b/go.mod
@@ -4,6 +4,7 @@ go 1.12
 
 require (
        github.com/emirpasic/gods v1.12.0
+       github.com/pkg/errors v0.8.1
        github.com/sirupsen/logrus v1.4.1
        github.com/stretchr/testify v1.3.0
        github.com/tidwall/gjson v1.2.1
diff --git a/go.sum b/go.sum
index 6580f88..cf90957 100644
--- a/go.sum
+++ b/go.sum
@@ -3,7 +3,10 @@ github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/emirpasic/gods v1.12.0 
h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
 github.com/emirpasic/gods v1.12.0/go.mod 
h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
+github.com/konsorten/go-windows-terminal-sequences v1.0.1 
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod 
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
+github.com/pkg/errors v0.8.1/go.mod 
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/sirupsen/logrus v1.4.1 
h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
diff --git a/internal/consumer/push_consumer.go 
b/internal/consumer/push_consumer.go
index 8acbc4c..d78f428 100644
--- a/internal/consumer/push_consumer.go
+++ b/internal/consumer/push_consumer.go
@@ -19,17 +19,15 @@ package consumer
 
 import (
        "context"
-       "errors"
        "fmt"
        "math"
-       "os"
        "strconv"
        "time"
 
        "github.com/apache/rocketmq-client-go/internal/kernel"
        "github.com/apache/rocketmq-client-go/primitive"
        "github.com/apache/rocketmq-client-go/rlog"
-       "github.com/apache/rocketmq-client-go/utils"
+       "github.com/pkg/errors"
 )
 
 // In most scenarios, this is the mostly recommended usage to consume messages.
@@ -63,24 +61,19 @@ type pushConsumer struct {
        interceptor primitive.CInterceptor
 }
 
-func NewPushConsumer(consumerGroup string, nameServerAddr string, opts 
...*primitive.ConsumerOption) (PushConsumer, error) {
-       if err := utils.VerifyIP(nameServerAddr); err != nil {
-               return nil, err
-       }
-       if nameServerAddr == "" {
-               rlog.Fatal("opts.NameServerAddr can't be empty")
-       }
-       err := os.Setenv(kernel.EnvNameServerAddr, nameServerAddr)
+func NewPushConsumer(consumerGroup string, nameServerAddrs []string, opts 
...*primitive.ConsumerOption) (PushConsumer, error) {
+       srvs, err := kernel.NewNamesrv(nameServerAddrs...)
        if err != nil {
-               rlog.Fatal("set env=EnvNameServerAddr error: %s ", err.Error())
+               return nil, errors.Wrap(err, "new Namesrv failed.")
        }
+       kernel.RegisterNamsrv(srvs)
 
        pushOpts := primitive.DefaultPushConsumerOptions()
        for _, op := range opts {
                op.Apply(&pushOpts)
        }
 
-       pushOpts.NameServerAddr = nameServerAddr
+       pushOpts.NameServerAddrs = nameServerAddrs
 
        dc := &defaultConsumer{
                consumerGroup:  consumerGroup,
diff --git a/internal/kernel/namesrv.go b/internal/kernel/namesrv.go
new file mode 100644
index 0000000..7f29a8b
--- /dev/null
+++ b/internal/kernel/namesrv.go
@@ -0,0 +1,107 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+import (
+       "errors"
+       "regexp"
+       "strings"
+       "sync"
+)
+
+var (
+       ipRegex, _ = 
regexp.Compile(`^((25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))\.){3}(25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))`)
+
+       ErrNoNameserver = errors.New("nameServerAddrs can't be empty.")
+       ErrMultiIP      = errors.New("multiple IP addr does not support")
+       ErrIllegalIP    = errors.New("IP addr error")
+)
+
+// Namesrvs rocketmq namesrv instance.
+type Namesrvs struct {
+       // namesrv addr list
+       srvs []string
+
+       // lock for getNamesrv in case of update index race condition
+       lock sync.Locker
+
+       // index indicate the next position for getNamesrv
+       index int
+}
+
+// NewNamesrv init Namesrv from namesrv addr string.
+func NewNamesrv(s ...string) (*Namesrvs, error) {
+       if len(s) == 0 {
+               return nil, ErrNoNameserver
+       }
+
+       ss := s
+       if len(ss) == 1 {
+               // compatible with multi server env string: "a;b;c"
+               ss = strings.Split(s[0], ";")
+       }
+
+       for _, srv := range ss {
+               if err := verifyIP(srv); err != nil {
+                       return nil, err
+               }
+       }
+
+       return &Namesrvs{
+               srvs: ss,
+               lock: new(sync.Mutex),
+       }, nil
+}
+
+// GetNamesrv return namesrv using round-robin strategy.
+func (s *Namesrvs) GetNamesrv() string {
+       s.lock.Lock()
+       defer s.lock.Unlock()
+
+       addr := s.srvs[s.index]
+       index := s.index + 1
+       if index < 0 {
+               index = -index
+       }
+       index %= len(s.srvs)
+       s.index = index
+       return addr
+}
+
+func (s *Namesrvs) Size() int {
+       return len(s.srvs)
+}
+
+func (s *Namesrvs) String() string {
+       return strings.Join(s.srvs, ";")
+}
+
+func verifyIP(ip string) error {
+       if strings.Contains(ip, ";") {
+               return ErrMultiIP
+       }
+       ips := ipRegex.FindAllString(ip, -1)
+       if len(ips) == 0 {
+               return ErrIllegalIP
+       }
+
+       if len(ips) > 1 {
+               return ErrMultiIP
+       }
+       return nil
+}
diff --git a/internal/kernel/namesrv_test.go b/internal/kernel/namesrv_test.go
new file mode 100644
index 0000000..accae0a
--- /dev/null
+++ b/internal/kernel/namesrv_test.go
@@ -0,0 +1,63 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestVerifyIP(t *testing.T) {
+       IPs := "127.0.0.1:9876"
+       err := verifyIP(IPs)
+       assert.Nil(t, err)
+
+       IPs = "12.24.123.243:10911"
+       err = verifyIP(IPs)
+       assert.Nil(t, err)
+
+       IPs = "xa2.0.0.1:9876"
+       err = verifyIP(IPs)
+       assert.Equal(t, "IP addr error", err.Error())
+
+       IPs = "333.0.0.1:9876"
+       err = verifyIP(IPs)
+       assert.Equal(t, "IP addr error", err.Error())
+
+       IPs = "127.0.0.1:9876;12.24.123.243:10911"
+       err = verifyIP(IPs)
+       assert.Equal(t, "multiple IP addr does not support", err.Error())
+}
+
+// TestSelector test roundrobin selector in namesrv
+func TestSelector(t *testing.T) {
+       srvs := []string{"127.0.0.1:9876", "127.0.0.1:9879", 
"12.24.123.243:10911", "12.24.123.243:10915"}
+       namesrv, err := NewNamesrv(srvs...)
+       assert.Nil(t, err)
+
+       assert.Equal(t, srvs[0], namesrv.GetNamesrv())
+       assert.Equal(t, srvs[1], namesrv.GetNamesrv())
+       assert.Equal(t, srvs[2], namesrv.GetNamesrv())
+       assert.Equal(t, srvs[3], namesrv.GetNamesrv())
+       assert.Equal(t, srvs[0], namesrv.GetNamesrv())
+       assert.Equal(t, srvs[1], namesrv.GetNamesrv())
+       assert.Equal(t, srvs[2], namesrv.GetNamesrv())
+       assert.Equal(t, srvs[3], namesrv.GetNamesrv())
+       assert.Equal(t, srvs[0], namesrv.GetNamesrv())
+}
diff --git a/internal/kernel/route.go b/internal/kernel/route.go
index 416d7b8..6e012c5 100644
--- a/internal/kernel/route.go
+++ b/internal/kernel/route.go
@@ -21,7 +21,6 @@ import (
        "encoding/json"
        "errors"
        "math/rand"
-       "os"
        "sort"
        "strconv"
        "strings"
@@ -48,8 +47,14 @@ const (
 var (
        ErrTopicNotExist = errors.New("topic not exist")
        nameSrvClient    = remote.NewRemotingClient()
+
+       nameSrvs *Namesrvs
 )
 
+func RegisterNamsrv(s *Namesrvs) {
+       nameSrvs = s
+}
+
 var (
        // brokerName -> *BrokerData
        brokerAddressesMap sync.Map
@@ -261,10 +266,21 @@ func queryTopicRouteInfoFromServer(topic string) 
(*TopicRouteData, error) {
        request := &GetRouteInfoRequest{
                Topic: topic,
        }
-       rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
-       response, err := nameSrvClient.InvokeSync(getNameServerAddress(), rc, 
requestTimeout)
 
+       var (
+               response *remote.RemotingCommand
+               err error
+       )
+       for i := 0; i < nameSrvs.Size(); i++ {
+               rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, 
request, nil)
+               response, err = 
nameSrvClient.InvokeSync(getNameServerAddress(), rc, requestTimeout)
+
+               if err != nil {
+                       continue
+               }
+       }
        if err != nil {
+               rlog.Errorf("connect to namesrv: %v failed.", nameSrvs)
                return nil, err
        }
 
@@ -372,7 +388,7 @@ func routeData2PublishInfo(topic string, data 
*TopicRouteData) *TopicPublishInfo
 }
 
 func getNameServerAddress() string {
-       return os.Getenv(EnvNameServerAddr)
+       return nameSrvs.GetNamesrv()
 }
 
 // TopicRouteData TopicRouteData
diff --git a/internal/producer/producer.go b/internal/producer/producer.go
index a559a98..5be553e 100644
--- a/internal/producer/producer.go
+++ b/internal/producer/producer.go
@@ -19,9 +19,7 @@ package producer
 
 import (
        "context"
-       "errors"
        "fmt"
-       "os"
        "sync"
        "sync/atomic"
        "time"
@@ -30,7 +28,7 @@ import (
        "github.com/apache/rocketmq-client-go/internal/remote"
        "github.com/apache/rocketmq-client-go/primitive"
        "github.com/apache/rocketmq-client-go/rlog"
-       "github.com/apache/rocketmq-client-go/utils"
+       "github.com/pkg/errors"
 )
 
 type Producer interface {
@@ -40,24 +38,18 @@ type Producer interface {
        SendOneWay(context.Context, *primitive.Message) error
 }
 
-func NewProducer(nameServerAddr string, opts ...*primitive.ProducerOption) 
(Producer, error) {
-       if err := utils.VerifyIP(nameServerAddr); err != nil {
-               return nil, err
-       }
-
-       if nameServerAddr == "" {
-               rlog.Fatal("nameServerAddr can't be empty")
-       }
-       err := os.Setenv(kernel.EnvNameServerAddr, nameServerAddr)
+func NewProducer(nameServerAddrs []string, opts ...*primitive.ProducerOption) 
(Producer, error) {
+       srvs, err := kernel.NewNamesrv(nameServerAddrs...)
        if err != nil {
-               rlog.Fatal("set env=EnvNameServerAddr error: %s ", err.Error())
+               return nil, errors.Wrap(err, "new Namesrv failed.")
        }
+       kernel.RegisterNamsrv(srvs)
 
        popts := primitive.DefaultProducerOptions()
        for _, opt := range opts {
                opt.Apply(&popts)
        }
-       popts.NameServerAddr = nameServerAddr
+       popts.NameServerAddrs = nameServerAddrs
 
        producer := &defaultProducer{
                group:   "default",
@@ -289,4 +281,3 @@ func (p *defaultProducer) IsPublishTopicNeedUpdate(topic 
string) bool {
 func (p *defaultProducer) IsUnitMode() bool {
        return false
 }
-
diff --git a/primitive/options.go b/primitive/options.go
index 0e40415..2cbf63c 100644
--- a/primitive/options.go
+++ b/primitive/options.go
@@ -30,7 +30,7 @@ type ProducerOptions struct {
        Interceptors []PInterceptor
 
        ClientOption
-       NameServerAddr           string
+       NameServerAddrs          []string
        GroupName                string
        RetryTimesWhenSendFailed int
        UnitMode                 bool
@@ -79,7 +79,7 @@ func WithRetry(retries int) *ProducerOption {
 
 type ConsumerOptions struct {
        ClientOption
-       NameServerAddr string
+       NameServerAddrs []string
 
        /**
         * Backtracking consumption time with second precision. Time format is
@@ -217,7 +217,7 @@ func (opt *ClientOption) String() string {
 }
 
 type ClientOption struct {
-       NameServerAddr    string
+       NameServerAddrs   string
        ClientIP          string
        InstanceName      string
        UnitMode          bool
diff --git a/utils/errors.go b/utils/errors.go
index 0d96d97..a3c7ead 100644
--- a/utils/errors.go
+++ b/utils/errors.go
@@ -17,7 +17,9 @@ limitations under the License.
 
 package utils
 
-import "github.com/apache/rocketmq-client-go/rlog"
+import (
+       "github.com/apache/rocketmq-client-go/rlog"
+)
 
 func CheckError(action string, err error) {
        if err != nil {
diff --git a/utils/string.go b/utils/string.go
index 427a8a5..a1f9950 100644
--- a/utils/string.go
+++ b/utils/string.go
@@ -18,14 +18,7 @@ limitations under the License.
 package utils
 
 import (
-       "errors"
        "fmt"
-       "regexp"
-       "strings"
-)
-
-var (
-       ipRegex, _ = 
regexp.Compile(`^((25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))\.){3}(25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))`)
 )
 
 // HashString hashes a string to a unique hashcode.
@@ -48,17 +41,3 @@ func StrJoin(str, key string, value interface{}) string {
        return str + key + ": " + fmt.Sprint(value) + ", "
 }
 
-func VerifyIP(ip string) error {
-       if strings.Contains(ip, ";") {
-               return errors.New("multiple IP addr does not support")
-       }
-       ips := ipRegex.FindAllString(ip, -1)
-       if len(ips) == 0 {
-               return errors.New("IP addr error")
-       }
-
-       if len(ips) > 1 {
-               return errors.New("multiple IP addr does not support")
-       }
-       return nil
-}
diff --git a/utils/string_test.go b/utils/string_test.go
index e0a9f70..5b55d23 100644
--- a/utils/string_test.go
+++ b/utils/string_test.go
@@ -1,29 +1 @@
-package utils
-
-import (
-       "testing"
-
-       "github.com/stretchr/testify/assert"
-)
-
-func TestVerifyIP(t *testing.T) {
-       IPs := "127.0.0.1:9876"
-       err := VerifyIP(IPs)
-       assert.Nil(t, err)
-
-       IPs = "12.24.123.243:10911"
-       err = VerifyIP(IPs)
-       assert.Nil(t, err)
-
-       IPs = "xa2.0.0.1:9876"
-       err = VerifyIP(IPs)
-       assert.Equal(t, "IP addr error", err.Error())
-
-       IPs = "333.0.0.1:9876"
-       err = VerifyIP(IPs)
-       assert.Equal(t, "IP addr error", err.Error())
-
-       IPs = "127.0.0.1:9876;12.24.123.243:10911"
-       err = VerifyIP(IPs)
-       assert.Equal(t, "multiple IP addr does not support", err.Error())
-}
+package utils
\ No newline at end of file

Reply via email to