This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 0897f19  [ISSUE #471] The message mode, ctype and where was wrong in 
the heartbeat data
0897f19 is described below

commit 0897f19d470248ac1af0e4985e93f8c4b2ce81b5
Author: DandelionJR <luyongjie1...@163.com>
AuthorDate: Fri Apr 24 10:00:07 2020 +0800

    [ISSUE #471] The message mode, ctype and where was wrong in the heartbeat 
data
    
    close #471
---
 consumer/push_consumer.go | 22 ++++++++++++++++++++++
 internal/client.go        |  9 ++++++---
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 9118ae2..d14f0a5 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -267,6 +267,28 @@ func (pc *pushConsumer) IsUnitMode() bool {
        return pc.unitMode
 }
 
+func (pc *pushConsumer) GetcType() string {
+       return string(pc.cType)
+}
+
+func (pc *pushConsumer) GetModel() string {
+       return pc.model.String()
+}
+
+func (pc *pushConsumer) GetWhere() string {
+       switch pc.fromWhere {
+       case ConsumeFromLastOffset:
+               return "CONSUME_FROM_LAST_OFFSET"
+       case ConsumeFromFirstOffset:
+               return "CONSUME_FROM_FIRST_OFFSET"
+       case ConsumeFromTimestamp:
+               return "CONSUME_FROM_TIMESTAMP"
+       default:
+               return "UNKOWN"
+       }
+
+}
+
 func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo 
{
        info := internal.NewConsumerRunningInfo()
 
diff --git a/internal/client.go b/internal/client.go
index 253b96e..954cc57 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -84,6 +84,9 @@ type InnerConsumer interface {
        Rebalance()
        IsUnitMode() bool
        GetConsumerRunningInfo() *ConsumerRunningInfo
+       GetcType() string
+       GetModel() string
+       GetWhere() string
 }
 
 func DefaultClientOptions() ClientOptions {
@@ -454,9 +457,9 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
                consumer := value.(InnerConsumer)
                cData := consumerData{
                        GroupName:         key.(string),
-                       CType:             "CONSUME_PASSIVELY",
-                       MessageModel:      "CLUSTERING",
-                       Where:             "CONSUME_FROM_FIRST_OFFSET",
+                       CType:             consumeType(consumer.GetcType()),
+                       MessageModel:      strings.ToUpper(consumer.GetModel()),
+                       Where:             consumer.GetWhere(),
                        UnitMode:          consumer.IsUnitMode(),
                        SubscriptionDatas: consumer.SubscriptionDataList(),
                }

Reply via email to