This is an automated email from the ASF dual-hosted git repository. huzongtang pushed a commit to branch native in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push: new e48203d update defaultConsumer.client init AND add push_consumer testing (#156) e48203d is described below commit e48203df5a6546e414657e470267f81199243828 Author: cloes <wayne511200...@163.com> AuthorDate: Thu Aug 15 15:22:26 2019 +0800 update defaultConsumer.client init AND add push_consumer testing (#156) * update mockgen command line,in order to fix generate file "import cycle not allowed" problem * update defaultConsumer.client init AND add push_consumer testing * fix bug:UpdateTopicRouteInfo() now can be called anytime * fix bug:fix defaultConsumer.client init problem * fix bug:recommit after go fmt --- consumer/consumer.go | 2 +- consumer/push_consumer.go | 1 + consumer/push_consumer_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 1 deletion(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index 56836ae..b5bf22f 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -275,7 +275,7 @@ func (dc *defaultConsumer) start() error { dc.subscriptionDataTable.Store(retryTopic, sub) } - dc.client = internal.GetOrNewRocketMQClient(dc.option.ClientOptions, nil) + //dc.client = internal.GetOrNewRocketMQClient(dc.option.ClientOptions, nil) if dc.model == Clustering { dc.option.ChangeInstanceNameToPID() dc.storage = NewRemoteOffsetStore(dc.consumerGroup, dc.client) diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 88ab362..3b9747c 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -67,6 +67,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) { internal.RegisterNamsrv(srvs) dc := &defaultConsumer{ + client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil), consumerGroup: defaultOpts.GroupName, cType: _PushConsume, state: internal.StateCreateJust, diff --git a/consumer/push_consumer_test.go b/consumer/push_consumer_test.go index f7bc454..2f4dbeb 100644 --- a/consumer/push_consumer_test.go +++ b/consumer/push_consumer_test.go @@ -16,3 +16,58 @@ limitations under the License. */ package consumer + +import ( + "context" + "fmt" + "github.com/apache/rocketmq-client-go/internal" + "github.com/apache/rocketmq-client-go/primitive" + "github.com/golang/mock/gomock" + . "github.com/smartystreets/goconvey/convey" + "testing" +) + +func mockB4Start(c *pushConsumer) { + c.topicSubscribeInfoTable.Store("TopicTest", []*primitive.MessageQueue{}) +} + +func TestStart(t *testing.T) { + Convey("test Start method", t, func() { + c, _ := NewPushConsumer( + WithGroupName("testGroup"), + WithNameServer([]string{"127.0.0.1:9876"}), + ) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + client := internal.NewMockRMQClient(ctrl) + c.client = client + + err := c.Subscribe("TopicTest", MessageSelector{}, func(ctx context.Context, + msgs ...*primitive.MessageExt) (ConsumeResult, error) { + fmt.Printf("subscribe callback: %v \n", msgs) + return ConsumeSuccess, nil + }) + + client.EXPECT().ClientID().Return("127.0.0.1@DEFAULT") + client.EXPECT().Start().Return() + client.EXPECT().RegisterConsumer(gomock.Any(), gomock.Any()).Return(nil) + client.EXPECT().UpdateTopicRouteInfo().AnyTimes().Return() + + Convey("test topic route info not found", func() { + client.EXPECT().Shutdown().Return() + err = c.Start() + So(err.Error(), ShouldContainSubstring, "route info not found") + }) + + Convey("test topic route info found", func() { + client.EXPECT().RebalanceImmediately().Return() + client.EXPECT().CheckClientInBroker().Return() + client.EXPECT().SendHeartbeatToAllBrokerWithLock().Return() + mockB4Start(c) + err = c.Start() + So(err, ShouldBeNil) + }) + }) +}