This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 3a3f93b  [ISSUE #86] Add Interceptor for producer and consumer. (#85)
3a3f93b is described below

commit 3a3f93bf5d18d8e680146c50e76af730e72595fc
Author: xujianhai666 <[email protected]>
AuthorDate: Sat Jul 6 21:30:36 2019 +0800

    [ISSUE #86] Add Interceptor for producer and consumer. (#85)
    
    * add interceptor
    
    * add log
    
    * add producer example
    
    * refactor code according to new version
    
    * refactor code according to new version
    
    * add example
    
    * fix nil bug
    
    * delete extra code
    
    * delete test code
    
    * add comment. resolves #86
    
    * rename
    
    * rename
    
    * stash
    
    * stash
    
    * fix  bug
    
    * stash
    
    * refactor consumer interceptor
    
    * add example
    
    * add example
    
    * 重构interceptor
    
    * fix typo
    
    * add ctx key
    
    * remove extra code
    
    * add ctx to conusme
    
    * refactor consumer interceptor
    
    * refactor consumer interceptor
    
    * refactor consumer interceptor
    
    * lower case chainInterceptor
    
    * rename println
---
 examples/consumer/interceptor/main.go       |  76 +++++++++++++++++
 examples/consumer/{ => simple}/main.go      |  10 +--
 examples/producer/{ => interceptor}/main.go |  32 +++++--
 examples/producer/{ => simple}/main.go      |   8 +-
 internal/consumer/consumer.go               |   2 +-
 internal/consumer/pull_consumer.go          |   4 +-
 internal/consumer/push_consumer.go          | 124 +++++++++++++++++++---------
 internal/kernel/client.go                   |  56 +++++++------
 internal/producer/producer.go               | 101 ++++++++++++++++++----
 primitive/consume.go                        |  16 +++-
 primitive/ctx.go                            |  60 ++++++++++++++
 primitive/interceptor.go                    |  57 +++++++++++++
 primitive/options.go                        |  99 +++++++++++++++++++++-
 primitive/result.go                         |  18 +++-
 14 files changed, 556 insertions(+), 107 deletions(-)

diff --git a/examples/consumer/interceptor/main.go 
b/examples/consumer/interceptor/main.go
new file mode 100644
index 0000000..01fedc3
--- /dev/null
+++ b/examples/consumer/interceptor/main.go
@@ -0,0 +1,76 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "time"
+
+       "github.com/apache/rocketmq-client-go/internal/consumer"
+       "github.com/apache/rocketmq-client-go/primitive"
+)
+
+func main() {
+       c, _ := consumer.NewPushConsumer("testGroup", "127.0.0.1:9876",
+               primitive.WithConsumerModel(primitive.Clustering),
+               
primitive.WithConsumeFromWhere(primitive.ConsumeFromFirstOffset),
+               primitive.WithChainConsumerInterceptor(UserFistInterceptor(), 
UserSecondInterceptor()))
+       err := c.Subscribe("TopicTest", primitive.MessageSelector{}, func(ctx 
*primitive.ConsumeMessageContext,
+               msgs []*primitive.MessageExt) (primitive.ConsumeResult, error) {
+               fmt.Println("subscribe callback: %v", msgs)
+               return primitive.ConsumeSuccess, nil
+       })
+       if err != nil {
+               fmt.Println(err.Error())
+       }
+       // Note: start after subscribe
+       err = c.Start()
+       if err != nil {
+               fmt.Println(err.Error())
+               os.Exit(-1)
+       }
+       time.Sleep(time.Hour)
+}
+
+func UserFistInterceptor() primitive.CInterceptor {
+       return func(ctx context.Context, req, reply interface{}, next 
primitive.CInvoker) error {
+               msgCtx, _ := primitive.GetConsumerCtx(ctx)
+               fmt.Printf("msgCtx: %v, mehtod: %s", msgCtx, 
primitive.GetMethod(ctx))
+
+               msgs := req.([]*primitive.MessageExt)
+               fmt.Printf("user first interceptor before invoke: %v\n", msgs)
+               e := next(ctx, msgs, reply)
+
+               holder := reply.(*primitive.ConsumeResultHolder)
+               fmt.Printf("user first interceptor after invoke: %v, result: 
%v\n", msgs, holder)
+               return e
+       }
+}
+
+func UserSecondInterceptor() primitive.CInterceptor {
+       return func(ctx context.Context, req, reply interface{}, next 
primitive.CInvoker)  error {
+               msgs := req.([]*primitive.MessageExt)
+               fmt.Printf("user second interceptor before invoke: %v\n", msgs)
+               e := next(ctx, msgs, reply)
+               holder := reply.(*primitive.ConsumeResultHolder)
+               fmt.Printf("user second interceptor after invoke: %v, result: 
%v\n", msgs, holder)
+               return e
+       }
+}
diff --git a/examples/consumer/main.go b/examples/consumer/simple/main.go
similarity index 79%
rename from examples/consumer/main.go
rename to examples/consumer/simple/main.go
index f433f73..70bbbd4 100644
--- a/examples/consumer/main.go
+++ b/examples/consumer/simple/main.go
@@ -27,14 +27,10 @@ import (
 )
 
 func main() {
-       c, _ := consumer.NewPushConsumer("testGroup", primitive.ConsumerOption{
-               NameServerAddr: "127.0.0.1:9876",
-               ConsumerModel:  primitive.Clustering,
-               FromWhere:      primitive.ConsumeFromFirstOffset,
-       })
-       err := c.Subscribe("test", primitive.MessageSelector{}, func(ctx 
*consumer.ConsumeMessageContext,
+       c, _ := consumer.NewPushConsumer("testGroup", "127.0.0.1:9876")
+       err := c.Subscribe("TopicTest", primitive.MessageSelector{}, func(ctx 
*primitive.ConsumeMessageContext,
                msgs []*primitive.MessageExt) (primitive.ConsumeResult, error) {
-               fmt.Println(msgs)
+               fmt.Println("subscribe callback: %v", msgs)
                return primitive.ConsumeSuccess, nil
        })
        if err != nil {
diff --git a/examples/producer/main.go b/examples/producer/interceptor/main.go
similarity index 56%
copy from examples/producer/main.go
copy to examples/producer/interceptor/main.go
index b52b266..f7bcf7a 100644
--- a/examples/producer/main.go
+++ b/examples/producer/interceptor/main.go
@@ -15,6 +15,7 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
+// Package main implements a producer with user custom interceptor.
 package main
 
 import (
@@ -27,19 +28,18 @@ import (
 )
 
 func main() {
-       opt := primitive.ProducerOptions{
-               NameServerAddr:           "127.0.0.1:9876",
-               RetryTimesWhenSendFailed: 2,
-       }
-       p, _ := producer.NewProducer(opt)
+       nameServerAddr := "127.0.0.1:9876"
+       p, _ := producer.NewProducer(nameServerAddr, primitive.WithRetry(2),
+               primitive.WithChainProducerInterceptor(UserFirstInterceptor(), 
UserSecondInterceptor()))
        err := p.Start()
        if err != nil {
                fmt.Printf("start producer error: %s", err.Error())
                os.Exit(1)
        }
-       for i := 0; i < 1000; i++ {
+       for i := 0; i < 10; i++ {
                res, err := p.SendSync(context.Background(), &primitive.Message{
-                       Topic: "test",
+                       //Topic: "test",
+                       Topic: "TopicTest",
                        Body:  []byte("Hello RocketMQ Go Client!"),
                })
 
@@ -54,3 +54,21 @@ func main() {
                fmt.Printf("shundown producer error: %s", err.Error())
        }
 }
+
+func UserFirstInterceptor() primitive.PInterceptor {
+       return func(ctx context.Context, req, reply interface{}, next 
primitive.PInvoker) error {
+               fmt.Printf("user first interceptor before invoke: req:%v, 
reply: %v\n", req, reply)
+               err := next(ctx, req, reply)
+               fmt.Printf("user first interceptor after invoke: req: %v, 
reply: %v \n", req, reply)
+               return err
+       }
+}
+
+func UserSecondInterceptor() primitive.PInterceptor {
+       return func(ctx context.Context, req, reply interface{}, next 
primitive.PInvoker) error {
+               fmt.Printf("user second interceptor before invoke: req: %v, 
reply: %v\n", req, reply)
+               err := next(ctx, req, reply)
+               fmt.Printf("user second interceptor after invoke: req: %v, 
reply: %v \n", req, reply)
+               return err
+       }
+}
diff --git a/examples/producer/main.go b/examples/producer/simple/main.go
similarity index 89%
rename from examples/producer/main.go
rename to examples/producer/simple/main.go
index b52b266..39c885d 100644
--- a/examples/producer/main.go
+++ b/examples/producer/simple/main.go
@@ -26,12 +26,10 @@ import (
        "github.com/apache/rocketmq-client-go/primitive"
 )
 
+// Package main implements a simple producer to send message.
 func main() {
-       opt := primitive.ProducerOptions{
-               NameServerAddr:           "127.0.0.1:9876",
-               RetryTimesWhenSendFailed: 2,
-       }
-       p, _ := producer.NewProducer(opt)
+       nameServerAddr := "127.0.0.1:9876"
+       p, _ := producer.NewProducer(nameServerAddr, primitive.WithRetry(2))
        err := p.Start()
        if err != nil {
                fmt.Printf("start producer error: %s", err.Error())
diff --git a/internal/consumer/consumer.go b/internal/consumer/consumer.go
index e9aa1b0..ecbbbb4 100644
--- a/internal/consumer/consumer.go
+++ b/internal/consumer/consumer.go
@@ -99,7 +99,7 @@ type defaultConsumer struct {
        state     kernel.ServiceState
        pause     bool
        once      sync.Once
-       option    primitive.ConsumerOption
+       option    primitive.ConsumerOptions
        // key: int, hash(*primitive.MessageQueue)
        // value: *processQueue
        processQueueTable sync.Map
diff --git a/internal/consumer/pull_consumer.go 
b/internal/consumer/pull_consumer.go
index ead633f..2338199 100644
--- a/internal/consumer/pull_consumer.go
+++ b/internal/consumer/pull_consumer.go
@@ -38,7 +38,7 @@ var (
        queueCounterTable sync.Map
 )
 
-func NewConsumer(config primitive.ConsumerOption) *defaultPullConsumer {
+func NewConsumer(config primitive.ConsumerOptions) *defaultPullConsumer {
        return &defaultPullConsumer{
                option: config,
        }
@@ -46,7 +46,7 @@ func NewConsumer(config primitive.ConsumerOption) 
*defaultPullConsumer {
 
 type defaultPullConsumer struct {
        state     kernel.ServiceState
-       option    primitive.ConsumerOption
+       option    primitive.ConsumerOptions
        client    *kernel.RMQClient
        GroupName string
        Model     primitive.MessageModel
diff --git a/internal/consumer/push_consumer.go 
b/internal/consumer/push_consumer.go
index be839bc..8acbc4c 100644
--- a/internal/consumer/push_consumer.go
+++ b/internal/consumer/push_consumer.go
@@ -49,46 +49,51 @@ type PushConsumer interface {
        Start() error
        Shutdown()
        Subscribe(topic string, selector primitive.MessageSelector,
-               f func(*ConsumeMessageContext, []*primitive.MessageExt) 
(primitive.ConsumeResult, error)) error
+               f func(*primitive.ConsumeMessageContext, 
[]*primitive.MessageExt) (primitive.ConsumeResult, error)) error
 }
 
 type pushConsumer struct {
        *defaultConsumer
        queueFlowControlTimes        int
        queueMaxSpanFlowControlTimes int
-       consume                      func(*ConsumeMessageContext, 
[]*primitive.MessageExt) (primitive.ConsumeResult, error)
+       consume                      func(*primitive.ConsumeMessageContext, 
[]*primitive.MessageExt) (primitive.ConsumeResult, error)
        submitToConsume              func(*processQueue, 
*primitive.MessageQueue)
        subscribedTopic              map[string]string
+
+       interceptor primitive.CInterceptor
 }
 
-func NewPushConsumer(consumerGroup string, opt primitive.ConsumerOption) 
(PushConsumer, error) {
-       if err := utils.VerifyIP(opt.NameServerAddr); err != nil {
+func NewPushConsumer(consumerGroup string, nameServerAddr string, opts 
...*primitive.ConsumerOption) (PushConsumer, error) {
+       if err := utils.VerifyIP(nameServerAddr); err != nil {
                return nil, err
        }
-       opt.InstanceName = "DEFAULT"
-       opt.ClientIP = utils.LocalIP()
-       if opt.NameServerAddr == "" {
-               rlog.Fatal("opt.NameServerAddr can't be empty")
+       if nameServerAddr == "" {
+               rlog.Fatal("opts.NameServerAddr can't be empty")
        }
-       err := os.Setenv(kernel.EnvNameServerAddr, opt.NameServerAddr)
+       err := os.Setenv(kernel.EnvNameServerAddr, nameServerAddr)
        if err != nil {
                rlog.Fatal("set env=EnvNameServerAddr error: %s ", err.Error())
        }
+
+       pushOpts := primitive.DefaultPushConsumerOptions()
+       for _, op := range opts {
+               op.Apply(&pushOpts)
+       }
+
+       pushOpts.NameServerAddr = nameServerAddr
+
        dc := &defaultConsumer{
                consumerGroup:  consumerGroup,
                cType:          _PushConsume,
                state:          kernel.StateCreateJust,
                prCh:           make(chan PullRequest, 4),
-               model:          opt.ConsumerModel,
-               consumeOrderly: opt.ConsumeOrderly,
-               fromWhere:      opt.FromWhere,
-               option:         opt,
+               model:          pushOpts.ConsumerModel,
+               consumeOrderly: pushOpts.ConsumeOrderly,
+               fromWhere:      pushOpts.FromWhere,
+               allocate:       pushOpts.Strategy,
+               option:         pushOpts,
        }
 
-       if opt.Strategy == nil {
-               opt.Strategy = primitive.AllocateByAveragely
-       }
-       dc.allocate = opt.Strategy
        p := &pushConsumer{
                defaultConsumer: dc,
                subscribedTopic: make(map[string]string, 0),
@@ -99,9 +104,37 @@ func NewPushConsumer(consumerGroup string, opt 
primitive.ConsumerOption) (PushCo
        } else {
                p.submitToConsume = p.consumeMessageCurrently
        }
+
+       chainInterceptor(p)
+
        return p, nil
 }
 
+// chainInterceptor chain list of interceptor as one interceptor
+func chainInterceptor(p *pushConsumer) {
+       interceptors := p.option.Interceptors
+       switch len(interceptors) {
+       case 0:
+               p.interceptor = nil
+       case 1:
+               p.interceptor = interceptors[0]
+       default:
+               p.interceptor = func(ctx context.Context, req, reply 
interface{}, invoker primitive.CInvoker) error {
+                       return interceptors[0](ctx, req, reply, 
getChainedInterceptor(interceptors, 0, invoker))
+               }
+       }
+}
+
+// getChainedInterceptor recursively generate the chained invoker.
+func getChainedInterceptor(interceptors []primitive.CInterceptor, cur int, 
finalInvoker primitive.CInvoker) primitive.CInvoker {
+       if cur == len(interceptors)-1 {
+               return finalInvoker
+       }
+       return func(ctx context.Context, req, reply interface{}) error {
+               return interceptors[cur+1](ctx, req, reply, 
getChainedInterceptor(interceptors, cur+1, finalInvoker))
+       }
+}
+
 func (pc *pushConsumer) Start() error {
        var err error
        pc.once.Do(func() {
@@ -164,7 +197,7 @@ func (pc *pushConsumer) Start() error {
 func (pc *pushConsumer) Shutdown() {}
 
 func (pc *pushConsumer) Subscribe(topic string, selector 
primitive.MessageSelector,
-       f func(*ConsumeMessageContext, []*primitive.MessageExt) 
(primitive.ConsumeResult, error)) error {
+       f func(*primitive.ConsumeMessageContext, []*primitive.MessageExt) 
(primitive.ConsumeResult, error)) error {
        if pc.state != kernel.StateCreateJust {
                return errors.New("subscribe topic only started before")
        }
@@ -424,6 +457,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                        sleepTime = _PullDelayTimeWhenError
                        goto NEXT
                }
+
                result, err := pc.client.PullMessage(context.Background(), 
brokerResult.BrokerAddr, pullRequest)
                if err != nil {
                        rlog.Warnf("pull message from %s error: %s", 
brokerResult.BrokerAddr, err.Error())
@@ -447,6 +481,8 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                        rt := time.Now().Sub(beginTime)
                        increasePullRT(pc.consumerGroup, request.mq.Topic, rt)
 
+                       
result.SetMessageExts(primitive.DecodeMessage(result.GetBody()))
+
                        msgFounded := result.GetMessageExts()
                        firstMsgOffset := int64(math.MaxInt64)
                        if msgFounded != nil && len(msgFounded) != 0 {
@@ -485,7 +521,7 @@ func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) {
        // TODO
 }
 
-func (pc *pushConsumer) sendMessageBack(ctx *ConsumeMessageContext, msg 
*primitive.MessageExt) bool {
+func (pc *pushConsumer) sendMessageBack(ctx *primitive.ConsumeMessageContext, 
msg *primitive.MessageExt) bool {
        return true
 }
 
@@ -570,16 +606,6 @@ func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq 
*primitive.MessageQueue
        return true
 }
 
-type ConsumeMessageContext struct {
-       consumerGroup string
-       msgs          []*primitive.MessageExt
-       mq            *primitive.MessageQueue
-       success       bool
-       status        string
-       // mqTractContext
-       properties map[string]string
-}
-
 func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq 
*primitive.MessageQueue) {
        msgs := pq.getMessages()
        if msgs == nil {
@@ -603,8 +629,8 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                                return
                        }
 
-                       ctx := &ConsumeMessageContext{
-                               properties: make(map[string]string),
+                       msgCtx := &primitive.ConsumeMessageContext{
+                               Properties: make(map[string]string),
                        }
                        // TODO hook
                        beginTime := time.Now()
@@ -620,16 +646,40 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                                                
beginTime.UnixNano()/int64(time.Millisecond), 10)
                                }
                        }
-                       result, err := pc.consume(ctx, subMsgs)
+                       var result primitive.ConsumeResult
+
+                       var err error
+                       if pc.interceptor == nil {
+                               result, err = pc.consume(msgCtx, subMsgs)
+                       } else {
+                               var container primitive.ConsumeResultHolder
+
+                               ctx := context.Background()
+                               ctx = primitive.WithConsumerCtx(ctx, msgCtx)
+                               ctx = primitive.WithMehod(ctx, 
primitive.ConsumerPush)
+
+                               err = pc.interceptor(ctx, subMsgs, &container, 
func(ctx context.Context, req, reply interface{}) error {
+                                       consumerCtx, _ := 
primitive.GetConsumerCtx(ctx)
+
+                                       msgs := req.([]*primitive.MessageExt)
+                                       r, e := pc.consume(consumerCtx, msgs)
+
+                                       realReply := 
reply.(*primitive.ConsumeResultHolder)
+                                       realReply.ConsumeResult = r
+                                       return e
+                               })
+                               result = container.ConsumeResult
+                       }
+
                        consumeRT := time.Now().Sub(beginTime)
                        if err != nil {
-                               ctx.properties["ConsumeContextType"] = 
"EXCEPTION"
+                               msgCtx.Properties["ConsumeContextType"] = 
"EXCEPTION"
                        } else if consumeRT >= pc.option.ConsumeTimeout {
-                               ctx.properties["ConsumeContextType"] = "TIMEOUT"
+                               msgCtx.Properties["ConsumeContextType"] = 
"TIMEOUT"
                        } else if result == primitive.ConsumeSuccess {
-                               ctx.properties["ConsumeContextType"] = "SUCCESS"
+                               msgCtx.Properties["ConsumeContextType"] = 
"SUCCESS"
                        } else {
-                               ctx.properties["ConsumeContextType"] = 
"RECONSUME_LATER"
+                               msgCtx.Properties["ConsumeContextType"] = 
"RECONSUME_LATER"
                        }
 
                        // TODO hook
@@ -648,7 +698,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                                        } else {
                                                for i := 0; i < len(msgs); i++ {
                                                        msg := msgs[i]
-                                                       if 
!pc.sendMessageBack(ctx, msg) {
+                                                       if 
!pc.sendMessageBack(msgCtx, msg) {
                                                                
msg.ReconsumeTimes += 1
                                                                msgBackFailed = 
append(msgBackFailed, msg)
                                                        }
diff --git a/internal/kernel/client.go b/internal/kernel/client.go
index 4687989..94f45c9 100644
--- a/internal/kernel/client.go
+++ b/internal/kernel/client.go
@@ -292,7 +292,7 @@ func (c *RMQClient) SendMessageOneWay(ctx context.Context, 
brokerAddrs string, r
        return nil, err
 }
 
-func (c *RMQClient) ProcessSendResponse(brokerName string, cmd 
*remote.RemotingCommand, msgs ...*primitive.Message) *primitive.SendResult {
+func (c *RMQClient) ProcessSendResponse(brokerName string, cmd 
*remote.RemotingCommand, resp *primitive.SendResult, msgs 
...*primitive.Message) {
        var status primitive.SendStatus
        switch cmd.Code {
        case ResFlushDiskTimeout:
@@ -321,20 +321,20 @@ func (c *RMQClient) ProcessSendResponse(brokerName 
string, cmd *remote.RemotingC
 
        qId, _ := strconv.Atoi(cmd.ExtFields["queueId"])
        off, _ := strconv.ParseInt(cmd.ExtFields["queueOffset"], 10, 64)
-       return &primitive.SendResult{
-               Status:      status,
-               MsgID:       cmd.ExtFields["msgId"],
-               OffsetMsgID: cmd.ExtFields["msgId"],
-               MessageQueue: &primitive.MessageQueue{
-                       Topic:      msgs[0].Topic,
-                       BrokerName: brokerName,
-                       QueueId:    qId,
-               },
-               QueueOffset: off,
-               //TransactionID: sendResponse.TransactionId,
-               RegionID: regionId,
-               TraceOn:  trace != "" && trace != _TranceOff,
+
+       resp.Status = status
+       resp.MsgID = cmd.ExtFields["msgId"]
+       resp.OffsetMsgID = cmd.ExtFields["msgId"]
+       resp.MessageQueue = &primitive.MessageQueue{
+               Topic:      msgs[0].Topic,
+               BrokerName: brokerName,
+               QueueId:    qId,
        }
+       resp.QueueOffset = off
+       //TransactionID: sendResponse.TransactionId,
+       resp.RegionID = regionId
+       resp.TraceOn = trace != "" && trace != _TranceOff
+
 }
 
 // PullMessage with sync
@@ -349,6 +349,7 @@ func (c *RMQClient) PullMessage(ctx context.Context, 
brokerAddrs string, request
 }
 
 func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) 
(*primitive.PullResult, error) {
+
        pullResult := &primitive.PullResult{}
        switch response.Code {
        case ResSuccess:
@@ -363,29 +364,32 @@ func (c *RMQClient) processPullResponse(response 
*remote.RemotingCommand) (*prim
                return nil, fmt.Errorf("unknown Response Code: %d, remark: %s", 
response.Code, response.Remark)
        }
 
-       v, exist := response.ExtFields["maxOffset"]
+       c.decodeCommandCustomHeader(pullResult, response)
+       pullResult.SetBody(response.Body)
+
+       return pullResult, nil
+}
+
+func (c *RMQClient) decodeCommandCustomHeader(pr *primitive.PullResult, cmd 
*remote.RemotingCommand) {
+       v, exist := cmd.ExtFields["maxOffset"]
        if exist {
-               pullResult.MaxOffset, _ = strconv.ParseInt(v, 10, 64)
+               pr.MaxOffset, _ = strconv.ParseInt(v, 10, 64)
        }
 
-       v, exist = response.ExtFields["minOffset"]
+       v, exist = cmd.ExtFields["minOffset"]
        if exist {
-               pullResult.MinOffset, _ = strconv.ParseInt(v, 10, 64)
+               pr.MinOffset, _ = strconv.ParseInt(v, 10, 64)
        }
 
-       v, exist = response.ExtFields["nextBeginOffset"]
+       v, exist = cmd.ExtFields["nextBeginOffset"]
        if exist {
-               pullResult.NextBeginOffset, _ = strconv.ParseInt(v, 10, 64)
+               pr.NextBeginOffset, _ = strconv.ParseInt(v, 10, 64)
        }
 
-       v, exist = response.ExtFields["suggestWhichBrokerId"]
+       v, exist = cmd.ExtFields["suggestWhichBrokerId"]
        if exist {
-               pullResult.SuggestWhichBrokerId, _ = strconv.ParseInt(v, 10, 64)
+               pr.SuggestWhichBrokerId, _ = strconv.ParseInt(v, 10, 64)
        }
-
-       //pullResult.messageExts = decodeMessage(response.Body) TODO parse in 
top
-
-       return pullResult, nil
 }
 
 // PullMessageAsync pull message async
diff --git a/internal/producer/producer.go b/internal/producer/producer.go
index 91bd389..fbcf5ee 100644
--- a/internal/producer/producer.go
+++ b/internal/producer/producer.go
@@ -40,25 +40,59 @@ type Producer interface {
        SendOneWay(context.Context, *primitive.Message) error
 }
 
-func NewProducer(opt primitive.ProducerOptions) (Producer, error) {
-       if err := utils.VerifyIP(opt.NameServerAddr); err != nil {
+func NewProducer(nameServerAddr string, opts ...*primitive.ProducerOption) 
(Producer, error) {
+       if err := utils.VerifyIP(nameServerAddr); err != nil {
                return nil, err
        }
-       if opt.RetryTimesWhenSendFailed == 0 {
-               opt.RetryTimesWhenSendFailed = 2
-       }
-       if opt.NameServerAddr == "" {
-               rlog.Fatal("opt.NameServerAddr can't be empty")
+
+       if nameServerAddr == "" {
+               rlog.Fatal("nameServerAddr can't be empty")
        }
-       err := os.Setenv(kernel.EnvNameServerAddr, opt.NameServerAddr)
+       err := os.Setenv(kernel.EnvNameServerAddr, nameServerAddr)
        if err != nil {
                rlog.Fatal("set env=EnvNameServerAddr error: %s ", err.Error())
        }
-       return &defaultProducer{
+
+       popts := primitive.DefaultProducerOptions()
+       for _, opt := range opts {
+               opt.Apply(&popts)
+       }
+       popts.NameServerAddr = nameServerAddr
+
+       producer := &defaultProducer{
                group:   "default",
-               client:  kernel.GetOrNewRocketMQClient(opt.ClientOption),
-               options: opt,
-       }, nil
+               client:  kernel.GetOrNewRocketMQClient(popts.ClientOption),
+               options: popts,
+       }
+
+       chainInterceptor(producer)
+
+       return producer, nil
+}
+
+// chainInterceptor chain list of interceptor as one interceptor
+func chainInterceptor(p *defaultProducer) {
+       interceptors := p.options.Interceptors
+       switch len(interceptors) {
+       case 0:
+               p.interceptor = nil
+       case 1:
+               p.interceptor = interceptors[0]
+       default:
+               p.interceptor = func(ctx context.Context, req, reply 
interface{}, invoker primitive.PInvoker) error {
+                       return interceptors[0](ctx, req, reply, 
getChainedInterceptor(interceptors, 0, invoker))
+               }
+       }
+}
+
+// getChainedInterceptor recursively generate the chained invoker.
+func getChainedInterceptor(interceptors []primitive.PInterceptor, cur int, 
finalInvoker primitive.PInvoker) primitive.PInvoker {
+       if cur == len(interceptors)-1 {
+               return finalInvoker
+       }
+       return func(ctx context.Context, req, reply interface{}) error {
+               return interceptors[cur+1](ctx, req, reply, 
getChainedInterceptor(interceptors, cur+1, finalInvoker))
+       }
 }
 
 type defaultProducer struct {
@@ -67,6 +101,8 @@ type defaultProducer struct {
        state       kernel.ServiceState
        options     primitive.ProducerOptions
        publishInfo sync.Map
+
+       interceptor primitive.PInterceptor
 }
 
 func (p *defaultProducer) Start() error {
@@ -89,11 +125,31 @@ func (p *defaultProducer) SendSync(ctx context.Context, 
msg *primitive.Message)
                return nil, errors.New("topic is nil")
        }
 
+       resp := new(primitive.SendResult)
+       if p.interceptor != nil {
+               primitive.WithMehod(ctx, primitive.SendSync)
+               err := p.interceptor(ctx, msg, resp, func(ctx context.Context, 
req, reply interface{}) error {
+                       var err error
+                       realReq := req.(*primitive.Message)
+                       realReply := reply.(*primitive.SendResult)
+                       err = p.sendSync(ctx, realReq, realReply)
+                       return err
+               })
+               return resp, err
+       }
+
+       p.sendSync(ctx, msg, resp)
+       return resp, nil
+}
+
+func (p *defaultProducer) sendSync(ctx context.Context, msg 
*primitive.Message, resp *primitive.SendResult) error {
+
        retryTime := 1 + p.options.RetryTimesWhenSendFailed
 
        var (
                err error
        )
+
        for retryCount := 0; retryCount < retryTime; retryCount++ {
                mq := p.selectMessageQueue(msg.Topic)
                if mq == nil {
@@ -103,7 +159,7 @@ func (p *defaultProducer) SendSync(ctx context.Context, msg 
*primitive.Message)
 
                addr := kernel.FindBrokerAddrByName(mq.BrokerName)
                if addr == "" {
-                       return nil, fmt.Errorf("topic=%s route info not found", 
mq.Topic)
+                       return fmt.Errorf("topic=%s route info not found", 
mq.Topic)
                }
 
                res, _err := p.client.InvokeSync(addr, p.buildSendRequest(mq, 
msg), 3*time.Second)
@@ -111,9 +167,10 @@ func (p *defaultProducer) SendSync(ctx context.Context, 
msg *primitive.Message)
                        err = _err
                        continue
                }
-               return p.client.ProcessSendResponse(mq.BrokerName, res, msg), 
nil
+               p.client.ProcessSendResponse(mq.BrokerName, res, resp, msg)
+               return nil
        }
-       return nil, err
+       return err
 }
 
 func (p *defaultProducer) SendOneWay(ctx context.Context, msg 
*primitive.Message) error {
@@ -125,6 +182,17 @@ func (p *defaultProducer) SendOneWay(ctx context.Context, 
msg *primitive.Message
                return errors.New("topic is nil")
        }
 
+       if p.interceptor != nil {
+               primitive.WithMehod(ctx, primitive.SendOneway)
+               return p.interceptor(ctx, msg, nil, func(ctx context.Context, 
req, reply interface{}) error {
+                       return p.SendOneWay(ctx, msg)
+               })
+       }
+
+       return p.sendOneWay(ctx, msg)
+}
+
+func (p *defaultProducer) sendOneWay(ctx context.Context, msg 
*primitive.Message) error {
        retryTime := 1 + p.options.RetryTimesWhenSendFailed
 
        var (
@@ -151,7 +219,8 @@ func (p *defaultProducer) SendOneWay(ctx context.Context, 
msg *primitive.Message
        return err
 }
 
-func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue, msg 
*primitive.Message) *remote.RemotingCommand {
+func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue,
+       msg *primitive.Message) *remote.RemotingCommand {
        req := &kernel.SendMessageRequest{
                ProducerGroup:  p.group,
                Topic:          mq.Topic,
diff --git a/primitive/consume.go b/primitive/consume.go
index 40e89b9..5edd461 100644
--- a/primitive/consume.go
+++ b/primitive/consume.go
@@ -4,7 +4,7 @@ package primitive
 // </p>
 //
 // RocketMQ supports two message models: clustering and broadcasting. If 
clustering is set, consumer clients with
-// the same {@link #consumerGroup} would only consume shards of the messages 
subscribed, which achieves load
+// the same {@link #ConsumerGroup} would only consume shards of the messages 
subscribed, which achieves load
 // balances; Conversely, if the broadcasting is set, each consumer client will 
consume all subscribed messages
 // separately.
 // </p>
@@ -126,3 +126,17 @@ const (
        ConsumeSuccess ConsumeResult = iota
        ConsumeRetryLater
 )
+
+type ConsumeMessageContext struct {
+       ConsumerGroup string
+       Msgs          []*MessageExt
+       MQ            *MessageQueue
+       Success       bool
+       Status        string
+       // mqTractContext
+       Properties map[string]string
+}
+
+type ConsumeResultHolder struct {
+       ConsumeResult
+}
\ No newline at end of file
diff --git a/primitive/ctx.go b/primitive/ctx.go
new file mode 100644
index 0000000..e74f91a
--- /dev/null
+++ b/primitive/ctx.go
@@ -0,0 +1,60 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+/*
+ * Define the ctx key and value type.
+ */
+package primitive
+
+import "context"
+
+type CtxKey int
+
+const (
+       method CtxKey = iota
+       msgCtx
+
+
+       // method name in  producer
+       SendSync = "SendSync"
+       SendOneway = "SendOneway"
+       // method name in consumer
+       ConsumerPush = "ConsumerPush"
+       ConsumerPull = "ConsumerPull"
+)
+
+// WithMehod set call method name
+func WithMehod(ctx context.Context, m string) context.Context {
+       return context.WithValue(ctx, method, m)
+}
+
+// GetMethod get call method name
+func GetMethod(ctx context.Context) string {
+       return ctx.Value(method).(string)
+}
+
+// WithConsumerCtx set ConsumeMessageContext in PushConsumer
+func WithConsumerCtx(ctx context.Context, c *ConsumeMessageContext) 
context.Context {
+       return context.WithValue(ctx, msgCtx, c)
+}
+
+// GetConsumerCtx get ConsumeMessageContext, only legal in PushConsumer. so 
should add bool return param indicate
+// whether exist.
+func GetConsumerCtx(ctx context.Context) (*ConsumeMessageContext, bool) {
+       c, exist := ctx.Value(msgCtx).(*ConsumeMessageContext)
+       return c, exist
+}
diff --git a/primitive/interceptor.go b/primitive/interceptor.go
new file mode 100644
index 0000000..cb86d19
--- /dev/null
+++ b/primitive/interceptor.go
@@ -0,0 +1,57 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package primitive
+
+import (
+       "context"
+)
+
+// PInvoker finish a send invoke on producer.
+type PInvoker func(ctx context.Context, req, reply interface{}) error
+
+// PInterceptor intercepts the execution of a send invoke on producer.
+type PInterceptor func(ctx context.Context, req, reply interface{}, next 
PInvoker) error
+
+// RetryInterceptor retry when send failed.
+func RetryPInterceptor() PInterceptor {
+       return func(ctx context.Context, req, reply interface{}, next PInvoker) 
error {
+               return nil
+       }
+}
+
+// TimeoutInterceptor add a timeout listener in case of operation timeout.
+func TimeoutPInterceptor() PInterceptor {
+       return func(ctx context.Context, req, reply interface{}, next PInvoker) 
error {
+               return nil
+       }
+}
+
+// LogInterceptor log a send invoke.
+func LogPInterceptor() PInterceptor {
+       return func(ctx context.Context, req, reply interface{}, next PInvoker) 
error {
+               return nil
+       }
+}
+
+// CInvoker finish a message invoke on consumer. In PushConsumer call, the req 
is []*MessageExt type and the reply is *ConsumeResultHolder,
+// use type assert to get real type.
+type CInvoker func(ctx context.Context, req , reply interface{}) error
+
+// CInterceptor intercepts the invoke of a consume on messages. In 
PushConsumer call, the req is []*MessageExt type and the reply is 
*ConsumeResultHolder,
+// use type assert to get real type.
+type CInterceptor func(ctx context.Context, req, reply interface{}, next 
CInvoker) error
diff --git a/primitive/options.go b/primitive/options.go
index b657ac0..0e40415 100644
--- a/primitive/options.go
+++ b/primitive/options.go
@@ -22,9 +22,13 @@ import (
        "os"
        "strconv"
        "time"
+
+       "github.com/apache/rocketmq-client-go/utils"
 )
 
 type ProducerOptions struct {
+       Interceptors []PInterceptor
+
        ClientOption
        NameServerAddr           string
        GroupName                string
@@ -32,7 +36,48 @@ type ProducerOptions struct {
        UnitMode                 bool
 }
 
-type ConsumerOption struct {
+func DefaultProducerOptions() ProducerOptions {
+       return ProducerOptions{
+               RetryTimesWhenSendFailed:  2,
+       }
+}
+
+// ProducerOption configures how we create the producer by set ProducerOptions 
value.
+type ProducerOption struct {
+       Apply func(*ProducerOptions)
+}
+
+func NewProducerOption(f func(options *ProducerOptions)) *ProducerOption {
+       return &ProducerOption{
+               Apply: f,
+       }
+}
+
+// WithProducerInterceptor returns a ProducerOption that specifies the 
interceptor for producer.
+func WithProducerInterceptor(f PInterceptor) *ProducerOption {
+       return NewProducerOption(func(options *ProducerOptions) {
+               options.Interceptors = append(options.Interceptors, f)
+       })
+}
+
+// WithChainProducerInterceptor returns a ProducerOption that specifies the 
chained interceptor for producer.
+// The first interceptor will be the outer most, while the last interceptor 
will be the inner most wrapper
+// around the real call.
+func WithChainProducerInterceptor(fs ...PInterceptor) *ProducerOption {
+       return NewProducerOption(func(options *ProducerOptions) {
+               options.Interceptors = append(options.Interceptors, fs...)
+       })
+}
+
+// WithRetry return a ProducerOption that specifies the retry times when send 
failed.
+// TODO: use retryMiddleeware instead.
+func WithRetry(retries int) *ProducerOption {
+       return  NewProducerOption(func(options *ProducerOptions) {
+               options.RetryTimesWhenSendFailed = retries
+       })
+}
+
+type ConsumerOptions struct {
        ClientOption
        NameServerAddr string
 
@@ -92,7 +137,7 @@ type ConsumerOption struct {
 
        // Max re-consume times. -1 means 16 times.
        //
-       // If messages are re-consumed more than {@link #maxReconsumeTimes} 
before success, it's be directed to a deletion
+       // If messages are re-consumed more than {@link #maxReconsumeTimes} 
before Success, it's be directed to a deletion
        // queue waiting.
        MaxReconsumeTimes int
 
@@ -107,6 +152,56 @@ type ConsumerOption struct {
        ConsumeOrderly bool
        FromWhere      ConsumeFromWhere
        // TODO traceDispatcher
+
+       Interceptors []CInterceptor
+}
+
+func DefaultPushConsumerOptions() ConsumerOptions{
+       return ConsumerOptions{
+               ClientOption: ClientOption{
+                       InstanceName: "DEFAULT",
+                       ClientIP: utils.LocalIP(),
+               },
+               Strategy: AllocateByAveragely,
+       }
+}
+
+type ConsumerOption struct {
+       Apply func(*ConsumerOptions)
+}
+
+func NewConsumerOption(f func(*ConsumerOptions)) *ConsumerOption {
+       return &ConsumerOption{
+               Apply: f,
+       }
+}
+
+func WithConsumerModel(m MessageModel) *ConsumerOption {
+       return NewConsumerOption(func(options *ConsumerOptions) {
+               options.ConsumerModel = m
+       })
+}
+
+func WithConsumeFromWhere(w ConsumeFromWhere) *ConsumerOption{
+       return NewConsumerOption(func(options *ConsumerOptions) {
+               options.FromWhere = w
+       })
+}
+
+// WithConsumerInterceptor returns a ConsumerOption that specifies the 
interceptor for consumer.
+func WithConsumerInterceptor(f CInterceptor) *ConsumerOption {
+       return NewConsumerOption(func(options *ConsumerOptions) {
+               options.Interceptors = append(options.Interceptors, f)
+       })
+}
+
+// WithChainConsumerInterceptor returns a ConsumerOption that specifies the 
chained interceptor for consumer.
+// The first interceptor will be the outer most, while the last interceptor 
will be the inner most wrapper
+// around the real call.
+func WithChainConsumerInterceptor(fs ...CInterceptor) *ConsumerOption {
+       return NewConsumerOption(func(options *ConsumerOptions) {
+               options.Interceptors = append(options.Interceptors, fs...)
+       })
 }
 
 func (opt *ClientOption) ChangeInstanceNameToPID() {
diff --git a/primitive/result.go b/primitive/result.go
index c37358c..628f243 100644
--- a/primitive/result.go
+++ b/primitive/result.go
@@ -56,10 +56,10 @@ func (result *SendResult) String() string {
                result.Status, result.MsgID, result.OffsetMsgID, 
result.QueueOffset, result.MessageQueue.String())
 }
 
-// PullStatus pull status
+// PullStatus pull Status
 type PullStatus int
 
-// predefined pull status
+// predefined pull Status
 const (
        PullFound PullStatus = iota
        PullNoNewMsg
@@ -75,7 +75,11 @@ type PullResult struct {
        MaxOffset            int64
        Status               PullStatus
        SuggestWhichBrokerId int64
+
+       // messageExts message info
        messageExts          []*MessageExt
+       //
+       body []byte
 }
 
 func (result *PullResult) GetMessageExts() []*MessageExt {
@@ -93,11 +97,19 @@ func (result *PullResult) GetMessages() []*Message {
        return toMessages(result.messageExts)
 }
 
+func (result *PullResult) SetBody(data []byte) {
+       result.body = data
+}
+
+func (result *PullResult) GetBody() []byte {
+       return result.body
+}
+
 func (result *PullResult) String() string {
        return ""
 }
 
-func decodeMessage(data []byte) []*MessageExt {
+func DecodeMessage(data []byte) []*MessageExt {
        msgs := make([]*MessageExt, 0)
        buf := bytes.NewBuffer(data)
        count := 0

Reply via email to