This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch native in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push: new 0897f19 [ISSUE #471] The message mode, ctype and where was wrong in the heartbeat data 0897f19 is described below commit 0897f19d470248ac1af0e4985e93f8c4b2ce81b5 Author: DandelionJR <luyongjie1...@163.com> AuthorDate: Fri Apr 24 10:00:07 2020 +0800 [ISSUE #471] The message mode, ctype and where was wrong in the heartbeat data close #471 --- consumer/push_consumer.go | 22 ++++++++++++++++++++++ internal/client.go | 9 ++++++--- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 9118ae2..d14f0a5 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -267,6 +267,28 @@ func (pc *pushConsumer) IsUnitMode() bool { return pc.unitMode } +func (pc *pushConsumer) GetcType() string { + return string(pc.cType) +} + +func (pc *pushConsumer) GetModel() string { + return pc.model.String() +} + +func (pc *pushConsumer) GetWhere() string { + switch pc.fromWhere { + case ConsumeFromLastOffset: + return "CONSUME_FROM_LAST_OFFSET" + case ConsumeFromFirstOffset: + return "CONSUME_FROM_FIRST_OFFSET" + case ConsumeFromTimestamp: + return "CONSUME_FROM_TIMESTAMP" + default: + return "UNKOWN" + } + +} + func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo { info := internal.NewConsumerRunningInfo() diff --git a/internal/client.go b/internal/client.go index 253b96e..954cc57 100644 --- a/internal/client.go +++ b/internal/client.go @@ -84,6 +84,9 @@ type InnerConsumer interface { Rebalance() IsUnitMode() bool GetConsumerRunningInfo() *ConsumerRunningInfo + GetcType() string + GetModel() string + GetWhere() string } func DefaultClientOptions() ClientOptions { @@ -454,9 +457,9 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() { consumer := value.(InnerConsumer) cData := consumerData{ GroupName: key.(string), - CType: "CONSUME_PASSIVELY", - MessageModel: "CLUSTERING", - Where: "CONSUME_FROM_FIRST_OFFSET", + CType: consumeType(consumer.GetcType()), + MessageModel: strings.ToUpper(consumer.GetModel()), + Where: consumer.GetWhere(), UnitMode: consumer.IsUnitMode(), SubscriptionDatas: consumer.SubscriptionDataList(), }