This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c97d492  [ISSUE #621] Support consume message directly (#622)
c97d492 is described below

commit c97d492fa2505361021311f8675464285563c6e5
Author: 张旭 <[email protected]>
AuthorDate: Tue Mar 16 19:38:44 2021 +0800

    [ISSUE #621] Support consume message directly (#622)
    
    * Support consume message directly
---
 consumer/push_consumer.go | 53 +++++++++++++++++++++++++++++++++++++++++++++++
 internal/client.go        | 40 +++++++++++++++++++++++++++++++++++
 internal/model.go         | 27 ++++++++++++++++++++++++
 internal/model_test.go    | 42 +++++++++++++++++++++++++++++++++++++
 internal/request.go       | 38 +++++++++++++++++++++++++++++++++
 5 files changed, 200 insertions(+)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index f13d5e2..8fb4637 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -291,6 +291,59 @@ func (pc *pushConsumer) GetWhere() string {
 
 }
 
+func (pc *pushConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, 
brokerName string) *internal.ConsumeMessageDirectlyResult {
+       var msgs = []*primitive.MessageExt{msg}
+       var mq = &primitive.MessageQueue{
+               Topic:      msg.Topic,
+               BrokerName: brokerName,
+               QueueId:    msg.Queue.QueueId,
+       }
+
+       beginTime := time.Now()
+       pc.resetRetryAndNamespace(msgs)
+       var result ConsumeResult
+
+       var err error
+       msgCtx := &primitive.ConsumeMessageContext{
+               Properties:    make(map[string]string),
+               ConsumerGroup: pc.consumerGroup,
+               MQ:            mq,
+               Msgs:          msgs,
+       }
+       ctx := context.Background()
+       ctx = primitive.WithConsumerCtx(ctx, msgCtx)
+       ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
+       concurrentCtx := primitive.NewConsumeConcurrentlyContext()
+       concurrentCtx.MQ = *mq
+       ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx)
+
+       result, err = pc.consumeInner(ctx, msgs)
+
+       consumeRT := time.Now().Sub(beginTime)
+
+       res := &internal.ConsumeMessageDirectlyResult{
+               Order:          false,
+               AutoCommit:     true,
+               SpentTimeMills: int64(consumeRT / time.Millisecond),
+       }
+
+       if err != nil {
+               msgCtx.Properties[primitive.PropCtxType] = 
string(primitive.ExceptionReturn)
+               res.ConsumeResult = internal.ThrowException
+               res.Remark = err.Error()
+       } else if result == ConsumeSuccess {
+               msgCtx.Properties[primitive.PropCtxType] = 
string(primitive.SuccessReturn)
+               res.ConsumeResult = internal.ConsumeSuccess
+       } else if result == ConsumeRetryLater {
+               msgCtx.Properties[primitive.PropCtxType] = 
string(primitive.FailedReturn)
+               res.ConsumeResult = internal.ConsumeRetryLater
+       }
+
+       increaseConsumeRT(pc.consumerGroup, mq.Topic, 
int64(consumeRT/time.Millisecond))
+
+       return res
+}
+
 func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo 
{
        info := internal.NewConsumerRunningInfo()
 
diff --git a/internal/client.go b/internal/client.go
index e2264a7..bc523ab 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -84,6 +84,7 @@ type InnerConsumer interface {
        Rebalance()
        IsUnitMode() bool
        GetConsumerRunningInfo() *ConsumerRunningInfo
+       ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) 
*ConsumeMessageDirectlyResult
        GetcType() string
        GetModel() string
        GetWhere() string
@@ -252,6 +253,36 @@ func GetOrNewRocketMQClient(option ClientOptions, 
callbackCh chan interface{}) R
                        }
                        return res
                })
+
+               
client.remoteClient.RegisterRequestFunc(ReqConsumeMessageDirectly, func(req 
*remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
+                       rlog.Info("receive consume message directly 
request...", nil)
+                       header := new(ConsumeMessageDirectlyHeader)
+                       header.Decode(req.ExtFields)
+                       val, exist := clientMap.Load(header.clientID)
+                       res := remote.NewRemotingCommand(ResError, nil, nil)
+                       if !exist {
+                               res.Remark = fmt.Sprintf("Can't find specified 
client instance of: %s", header.clientID)
+                       } else {
+                               cli, ok := val.(*rmqClient)
+                               msg := primitive.DecodeMessage(req.Body)[0]
+                               var consumeMessageDirectlyResult 
*ConsumeMessageDirectlyResult
+                               if ok {
+                                       consumeMessageDirectlyResult = 
cli.consumeMessageDirectly(msg, header.consumerGroup, header.brokerName)
+                               }
+                               if consumeMessageDirectlyResult != nil {
+                                       res.Code = ResSuccess
+                                       data, err := 
consumeMessageDirectlyResult.Encode()
+                                       if err != nil {
+                                               res.Remark = fmt.Sprintf("json 
marshal error: %s", err.Error())
+                                       } else {
+                                               res.Body = data
+                                       }
+                               } else {
+                                       res.Remark = "there is unexpected error 
when consume message directly, please check log"
+                               }
+                       }
+                       return res
+               })
        }
        return actual.(*rmqClient)
 }
@@ -744,6 +775,15 @@ func (c *rmqClient) getConsumerRunningInfo(group string) 
*ConsumerRunningInfo {
        return info
 }
 
+func (c *rmqClient) consumeMessageDirectly(msg *primitive.MessageExt, group 
string, brokerName string) *ConsumeMessageDirectlyResult {
+       consumer, exist := c.consumerMap.Load(group)
+       if !exist {
+               return nil
+       }
+       res := consumer.(InnerConsumer).ConsumeMessageDirectly(msg, brokerName)
+       return res
+}
+
 func routeData2SubscribeInfo(topic string, data *TopicRouteData) 
[]*primitive.MessageQueue {
        list := make([]*primitive.MessageQueue, 0)
        for idx := range data.QueueDataList {
diff --git a/internal/model.go b/internal/model.go
index 0ee9ccc..934c610 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -261,3 +261,30 @@ func NewConsumerRunningInfo() *ConsumerRunningInfo {
                StatusTable:      make(map[string]ConsumeStatus),
        }
 }
+
+type ConsumeMessageDirectlyResult struct {
+       Order          bool          `json:"order"`
+       AutoCommit     bool          `json:"autoCommit"`
+       ConsumeResult  ConsumeResult `json:"consumeResult"`
+       Remark         string        `json:"remark"`
+       SpentTimeMills int64         `json:"spentTimeMills"`
+}
+
+type ConsumeResult int
+
+const (
+       ConsumeSuccess ConsumeResult = iota
+       ConsumeRetryLater
+       Rollback
+       Commit
+       ThrowException
+       ReturnNull
+)
+
+func (result ConsumeMessageDirectlyResult) Encode() ([]byte, error) {
+       data, err := json.Marshal(result)
+       if err != nil {
+               return nil, err
+       }
+       return data, nil
+}
diff --git a/internal/model_test.go b/internal/model_test.go
index 0d8dcd7..57ff0af 100644
--- a/internal/model_test.go
+++ b/internal/model_test.go
@@ -362,3 +362,45 @@ func TestConsumerRunningInfo_MarshalJSON(t *testing.T) {
                })
        })
 }
+
+func TestConsumeMessageDirectlyResult_MarshalJSON(t *testing.T) {
+       Convey("test ConsumeMessageDirectlyResult MarshalJson", t, func() {
+               Convey("test consume success", func() {
+                       consumeMessageDirectlyResult := 
ConsumeMessageDirectlyResult{
+                               Order:          false,
+                               AutoCommit:     true,
+                               SpentTimeMills: 2,
+                       }
+                       consumeMessageDirectlyResult.ConsumeResult = 
ConsumeSuccess
+                       data, err := consumeMessageDirectlyResult.Encode()
+                       So(err, ShouldBeNil)
+                       fmt.Printf("json consumeMessageDirectlyResult: %s\n", 
string(data))
+               })
+
+               Convey("test consume timeout", func() {
+                       consumeResult := ConsumeMessageDirectlyResult{
+                               Order:          false,
+                               AutoCommit:     true,
+                               SpentTimeMills: 2,
+                       }
+                       consumeResult.ConsumeResult = ReturnNull
+                       data, err := consumeResult.Encode()
+                       So(err, ShouldBeNil)
+                       fmt.Printf("json consumeMessageDirectlyResult: %s\n", 
string(data))
+               })
+
+               Convey("test consume exception", func() {
+                       consumeResult := ConsumeMessageDirectlyResult{
+                               Order:          false,
+                               AutoCommit:     true,
+                               SpentTimeMills: 5,
+                       }
+                       consumeResult.ConsumeResult = ThrowException
+                       consumeResult.Remark = "Unknown Exception"
+                       data, err := consumeResult.Encode()
+                       So(err, ShouldBeNil)
+                       fmt.Printf("json consumeMessageDirectlyResult: %s\n", 
string(data))
+               })
+       })
+
+}
diff --git a/internal/request.go b/internal/request.go
index fa88efe..ed3de33 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -407,3 +407,41 @@ func (request *DeleteTopicRequestHeader) Encode() 
map[string]string {
 
        return maps
 }
+
+type ConsumeMessageDirectlyHeader struct {
+       consumerGroup string
+       clientID      string
+       msgId         string
+       brokerName    string
+}
+
+func (request *ConsumeMessageDirectlyHeader) Encode() map[string]string {
+       maps := make(map[string]string)
+       maps["consumerGroup"] = request.consumerGroup
+       maps["clientId"] = request.clientID
+       maps["msgId"] = request.msgId
+       maps["brokerName"] = request.brokerName
+       return maps
+}
+
+func (request *ConsumeMessageDirectlyHeader) Decode(properties 
map[string]string) {
+       if len(properties) == 0 {
+               return
+       }
+
+       if v, existed := properties["consumerGroup"]; existed {
+               request.consumerGroup = v
+       }
+
+       if v, existed := properties["clientId"]; existed {
+               request.clientID = v
+       }
+
+       if v, existed := properties["msgId"]; existed {
+               request.msgId = v
+       }
+
+       if v, existed := properties["brokerName"]; existed {
+               request.brokerName = v
+       }
+}

Reply via email to