This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 5c00ac1  fix panic bugs when topic not exist (#63)
5c00ac1 is described below

commit 5c00ac16346864dd302e5de5e55c987b0b97e41c
Author: wenfeng <[email protected]>
AuthorDate: Tue May 21 09:59:59 2019 +0800

    fix panic bugs when topic not exist (#63)
---
 consumer/push_consumer.go | 15 +++++++++++++--
 examples/consumer/main.go |  2 +-
 examples/producer/main.go |  2 +-
 go.sum                    |  3 +++
 kernel/client.go          | 12 +++++++++++-
 producer/producer.go      |  8 ++++++--
 utils/string.go           | 26 +++++++++++++++++++++++++-
 utils/string_test.go      | 28 ++++++++++++++++++++++++++++
 8 files changed, 88 insertions(+), 8 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 284d14b..1cb5efd 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -20,6 +20,7 @@ package consumer
 import (
        "context"
        "errors"
+       "fmt"
        "github.com/apache/rocketmq-client-go/kernel"
        "github.com/apache/rocketmq-client-go/rlog"
        "github.com/apache/rocketmq-client-go/utils"
@@ -61,7 +62,10 @@ type pushConsumer struct {
        subscribedTopic              map[string]string
 }
 
-func NewPushConsumer(consumerGroup string, opt ConsumerOption) PushConsumer {
+func NewPushConsumer(consumerGroup string, opt ConsumerOption) (PushConsumer, 
error) {
+       if err := utils.VerifyIP(opt.NameServerAddr); err != nil {
+               return nil, err
+       }
        opt.InstanceName = "DEFAULT"
        opt.ClientIP = utils.LocalIP()
        if opt.NameServerAddr == "" {
@@ -109,7 +113,7 @@ func NewPushConsumer(consumerGroup string, opt 
ConsumerOption) PushConsumer {
        } else {
                p.submitToConsume = p.consumeMessageCurrently
        }
-       return p
+       return p, nil
 }
 
 func (pc *pushConsumer) Start() error {
@@ -158,6 +162,13 @@ func (pc *pushConsumer) Start() error {
        })
 
        pc.client.UpdateTopicRouteInfo()
+       for k := range pc.subscribedTopic {
+               _, exist := pc.topicSubscribeInfoTable.Load(k)
+               if !exist {
+                       pc.client.Shutdown()
+                       return fmt.Errorf("the topic=%s route info not found, 
it may not exist", k)
+               }
+       }
        pc.client.RebalanceImmediately()
        pc.client.CheckClientInBroker()
        pc.client.SendHeartbeatToAllBrokerWithLock()
diff --git a/examples/consumer/main.go b/examples/consumer/main.go
index 53e9cb5..f660d43 100644
--- a/examples/consumer/main.go
+++ b/examples/consumer/main.go
@@ -26,7 +26,7 @@ import (
 )
 
 func main() {
-       c := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
+       c, _ := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
                NameServerAddr: "127.0.0.1:9876",
                ConsumerModel:  consumer.Clustering,
                FromWhere:      consumer.ConsumeFromFirstOffset,
diff --git a/examples/producer/main.go b/examples/producer/main.go
index 83bd127..9cc626b 100644
--- a/examples/producer/main.go
+++ b/examples/producer/main.go
@@ -30,7 +30,7 @@ func main() {
                NameServerAddr:           "127.0.0.1:9876",
                RetryTimesWhenSendFailed: 2,
        }
-       p := producer.NewProducer(opt)
+       p, _ := producer.NewProducer(opt)
        err := p.Start()
        if err != nil {
                fmt.Printf("start producer error: %s", err.Error())
diff --git a/go.sum b/go.sum
index 07969a4..6580f88 100644
--- a/go.sum
+++ b/go.sum
@@ -1,14 +1,17 @@
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/emirpasic/gods v1.12.0 
h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
 github.com/emirpasic/gods v1.12.0/go.mod 
h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod 
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/sirupsen/logrus v1.4.1 
h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
 github.com/sirupsen/logrus v1.4.1/go.mod 
h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
 github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.1.1/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v1.2.2/go.mod 
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0 
h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
 github.com/stretchr/testify v1.3.0/go.mod 
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/tidwall/gjson v1.2.1 h1:j0efZLrZUvNerEf6xqoi0NjWMK5YlLrR7Guo/dxY174=
 github.com/tidwall/gjson v1.2.1/go.mod 
h1:c/nTNbUr0E0OrXEhq1pwa8iEgc2DOt4ZZqAt1HtCkPA=
diff --git a/kernel/client.go b/kernel/client.go
index b91ab1f..9dbecc6 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -47,7 +47,7 @@ const (
        _PersistOffset = 5 * time.Second
 
        // Rebalance interval
-       _RebalanceInterval = 100 * time.Millisecond
+       _RebalanceInterval = 10 * time.Second
 )
 
 var (
@@ -181,6 +181,10 @@ func (c *RMQClient) Start() {
        })
 }
 
+func (c *RMQClient) Shutdown() {
+       // TODO
+}
+
 func (c *RMQClient) ClientID() string {
        id := c.option.ClientIP + "@" + c.option.InstanceName
        if c.option.UnitName != "" {
@@ -441,6 +445,9 @@ func (c *RMQClient) RebalanceImmediately() {
 }
 
 func (c *RMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+       if data == nil {
+               return
+       }
        if !c.isNeedUpdatePublishInfo(topic) {
                return
        }
@@ -467,6 +474,9 @@ func (c *RMQClient) isNeedUpdatePublishInfo(topic string) 
bool {
 }
 
 func (c *RMQClient) UpdateSubscribeInfo(topic string, data *TopicRouteData) {
+       if data == nil {
+               return
+       }
        if !c.isNeedUpdateSubscribeInfo(topic) {
                return
        }
diff --git a/producer/producer.go b/producer/producer.go
index 439c585..8bb0bf8 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -24,6 +24,7 @@ import (
        "github.com/apache/rocketmq-client-go/kernel"
        "github.com/apache/rocketmq-client-go/remote"
        "github.com/apache/rocketmq-client-go/rlog"
+       "github.com/apache/rocketmq-client-go/utils"
        "os"
        "sync"
        "sync/atomic"
@@ -37,7 +38,10 @@ type Producer interface {
        SendOneWay(context.Context, *kernel.Message) error
 }
 
-func NewProducer(opt ProducerOptions) Producer {
+func NewProducer(opt ProducerOptions) (Producer, error) {
+       if err := utils.VerifyIP(opt.NameServerAddr); err != nil {
+               return nil, err
+       }
        if opt.RetryTimesWhenSendFailed == 0 {
                opt.RetryTimesWhenSendFailed = 2
        }
@@ -52,7 +56,7 @@ func NewProducer(opt ProducerOptions) Producer {
                group:   "default",
                client:  kernel.GetOrNewRocketMQClient(opt.ClientOption),
                options: opt,
-       }
+       }, nil
 }
 
 type defaultProducer struct {
diff --git a/utils/string.go b/utils/string.go
index 6a74808..427a8a5 100644
--- a/utils/string.go
+++ b/utils/string.go
@@ -17,7 +17,16 @@ limitations under the License.
 
 package utils
 
-import "fmt"
+import (
+       "errors"
+       "fmt"
+       "regexp"
+       "strings"
+)
+
+var (
+       ipRegex, _ = 
regexp.Compile(`^((25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))\.){3}(25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))`)
+)
 
 // HashString hashes a string to a unique hashcode.
 func HashString(s string) int {
@@ -38,3 +47,18 @@ func StrJoin(str, key string, value interface{}) string {
 
        return str + key + ": " + fmt.Sprint(value) + ", "
 }
+
+func VerifyIP(ip string) error {
+       if strings.Contains(ip, ";") {
+               return errors.New("multiple IP addr does not support")
+       }
+       ips := ipRegex.FindAllString(ip, -1)
+       if len(ips) == 0 {
+               return errors.New("IP addr error")
+       }
+
+       if len(ips) > 1 {
+               return errors.New("multiple IP addr does not support")
+       }
+       return nil
+}
diff --git a/utils/string_test.go b/utils/string_test.go
new file mode 100644
index 0000000..f6d2fe6
--- /dev/null
+++ b/utils/string_test.go
@@ -0,0 +1,28 @@
+package utils
+
+import (
+       "github.com/stretchr/testify/assert"
+       "testing"
+)
+
+func TestVerifyIP(t *testing.T) {
+       IPs := "127.0.0.1:9876"
+       err := VerifyIP(IPs)
+       assert.Nil(t, err)
+
+       IPs = "12.24.123.243:10911"
+       err = VerifyIP(IPs)
+       assert.Nil(t, err)
+
+       IPs = "xa2.0.0.1:9876"
+       err = VerifyIP(IPs)
+       assert.Equal(t, "IP addr error", err.Error())
+
+       IPs = "333.0.0.1:9876"
+       err = VerifyIP(IPs)
+       assert.Equal(t, "IP addr error", err.Error())
+
+       IPs = "127.0.0.1:9876;12.24.123.243:10911"
+       err = VerifyIP(IPs)
+       assert.Equal(t, "multiple IP addr does not support", err.Error())
+}

Reply via email to