This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new e48203d  update defaultConsumer.client init AND add push_consumer 
testing (#156)
e48203d is described below

commit e48203df5a6546e414657e470267f81199243828
Author: cloes <wayne511200...@163.com>
AuthorDate: Thu Aug 15 15:22:26 2019 +0800

    update defaultConsumer.client init AND add push_consumer testing (#156)
    
    * update mockgen command line,in order to fix generate file "import cycle 
not allowed" problem
    
    * update defaultConsumer.client init AND add push_consumer testing
    
    * fix bug:UpdateTopicRouteInfo() now can be called anytime
    
    * fix bug:fix defaultConsumer.client init problem
    
    * fix bug:recommit after go fmt
---
 consumer/consumer.go           |  2 +-
 consumer/push_consumer.go      |  1 +
 consumer/push_consumer_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 57 insertions(+), 1 deletion(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 56836ae..b5bf22f 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -275,7 +275,7 @@ func (dc *defaultConsumer) start() error {
                dc.subscriptionDataTable.Store(retryTopic, sub)
        }
 
-       dc.client = internal.GetOrNewRocketMQClient(dc.option.ClientOptions, 
nil)
+       //dc.client = internal.GetOrNewRocketMQClient(dc.option.ClientOptions, 
nil)
        if dc.model == Clustering {
                dc.option.ChangeInstanceNameToPID()
                dc.storage = NewRemoteOffsetStore(dc.consumerGroup, dc.client)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 88ab362..3b9747c 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -67,6 +67,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
        internal.RegisterNamsrv(srvs)
 
        dc := &defaultConsumer{
+               client:         
internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
                consumerGroup:  defaultOpts.GroupName,
                cType:          _PushConsume,
                state:          internal.StateCreateJust,
diff --git a/consumer/push_consumer_test.go b/consumer/push_consumer_test.go
index f7bc454..2f4dbeb 100644
--- a/consumer/push_consumer_test.go
+++ b/consumer/push_consumer_test.go
@@ -16,3 +16,58 @@ limitations under the License.
 */
 
 package consumer
+
+import (
+       "context"
+       "fmt"
+       "github.com/apache/rocketmq-client-go/internal"
+       "github.com/apache/rocketmq-client-go/primitive"
+       "github.com/golang/mock/gomock"
+       . "github.com/smartystreets/goconvey/convey"
+       "testing"
+)
+
+func mockB4Start(c *pushConsumer) {
+       c.topicSubscribeInfoTable.Store("TopicTest", 
[]*primitive.MessageQueue{})
+}
+
+func TestStart(t *testing.T) {
+       Convey("test Start method", t, func() {
+               c, _ := NewPushConsumer(
+                       WithGroupName("testGroup"),
+                       WithNameServer([]string{"127.0.0.1:9876"}),
+               )
+
+               ctrl := gomock.NewController(t)
+               defer ctrl.Finish()
+
+               client := internal.NewMockRMQClient(ctrl)
+               c.client = client
+
+               err := c.Subscribe("TopicTest", MessageSelector{}, func(ctx 
context.Context,
+                       msgs ...*primitive.MessageExt) (ConsumeResult, error) {
+                       fmt.Printf("subscribe callback: %v \n", msgs)
+                       return ConsumeSuccess, nil
+               })
+
+               client.EXPECT().ClientID().Return("127.0.0.1@DEFAULT")
+               client.EXPECT().Start().Return()
+               client.EXPECT().RegisterConsumer(gomock.Any(), 
gomock.Any()).Return(nil)
+               client.EXPECT().UpdateTopicRouteInfo().AnyTimes().Return()
+
+               Convey("test topic route info not found", func() {
+                       client.EXPECT().Shutdown().Return()
+                       err = c.Start()
+                       So(err.Error(), ShouldContainSubstring, "route info not 
found")
+               })
+
+               Convey("test topic route info found", func() {
+                       client.EXPECT().RebalanceImmediately().Return()
+                       client.EXPECT().CheckClientInBroker().Return()
+                       
client.EXPECT().SendHeartbeatToAllBrokerWithLock().Return()
+                       mockB4Start(c)
+                       err = c.Start()
+                       So(err, ShouldBeNil)
+               })
+       })
+}

Reply via email to