This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 46701f1  feat(internal): support reset consumer offset (#682)
46701f1 is described below

commit 46701f1d6e4b3fbbbd1e702a0cae9ae947b91da3
Author: 张旭 <[email protected]>
AuthorDate: Fri Jul 30 16:49:40 2021 +0800

    feat(internal): support reset consumer offset (#682)
    
    Co-authored-by: zhangxu16 <[email protected]>
---
 consumer/process_queue.go |  1 +
 consumer/push_consumer.go | 15 +++++++++------
 internal/client.go        | 27 +++++++++++++++++++++++++++
 internal/model.go         | 47 +++++++++++++++++++++++++++++++++++++++++++++++
 internal/model_test.go    | 16 ++++++++++++++++
 internal/request.go       | 35 ++++++++++++++++++++++++++++++++++-
 rlog/log.go               |  1 +
 7 files changed, 135 insertions(+), 7 deletions(-)

diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 0e9d8ec..76a9236 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -343,6 +343,7 @@ func (pq *processQueue) MaxOrderlyCache() int64 {
 
 func (pq *processQueue) clear() {
        pq.mutex.Lock()
+       defer pq.mutex.Unlock()
        pq.msgCache.Clear()
        pq.cachedMsgCount = 0
        pq.cachedMsgSize = 0
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index ec44a1e..c84ce84 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -831,7 +831,7 @@ func (pc *pushConsumer) resume() {
        rlog.Info(fmt.Sprintf("resume consumer: %s", pc.consumerGroup), nil)
 }
 
-func (pc *pushConsumer) resetOffset(topic string, table 
map[primitive.MessageQueue]int64) {
+func (pc *pushConsumer) ResetOffset(topic string, table 
map[primitive.MessageQueue]int64) {
        //topic := cmd.ExtFields["topic"]
        //group := cmd.ExtFields["group"]
        //if topic == "" || group == "" {
@@ -857,11 +857,13 @@ func (pc *pushConsumer) resetOffset(topic string, table 
map[primitive.MessageQue
        //      rlog.Infof("[reset-offset] consumer dose not exist. group=%s", 
group)
        //      return
        //}
+       pc.suspend()
+       defer pc.resume()
 
        pc.processQueueTable.Range(func(key, value interface{}) bool {
                mq := key.(primitive.MessageQueue)
                pq := value.(*processQueue)
-               if _, ok := table[mq]; !ok {
+               if _, ok := table[mq]; ok && mq.Topic == topic {
                        pq.WithDropped(true)
                        pq.clear()
                }
@@ -872,16 +874,17 @@ func (pc *pushConsumer) resetOffset(topic string, table 
map[primitive.MessageQue
        if !exist {
                return
        }
-       queuesOfTopic := v.([]primitive.MessageQueue)
+       queuesOfTopic := v.([]*primitive.MessageQueue)
        for _, k := range queuesOfTopic {
-               if _, ok := table[k]; ok {
-                       pc.storage.update(&k, table[k], false)
+               if _, ok := table[*k]; ok {
+                       pc.storage.update(k, table[*k], false)
                        v, exist := pc.processQueueTable.Load(k)
                        if !exist {
                                continue
                        }
                        pq := v.(*processQueue)
-                       pc.removeUnnecessaryMessageQueue(&k, pq)
+                       pc.removeUnnecessaryMessageQueue(k, pq)
+                       pc.processQueueTable.Delete(k)
                }
        }
 }
diff --git a/internal/client.go b/internal/client.go
index 6e665ea..3a09ea8 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -88,6 +88,7 @@ type InnerConsumer interface {
        GetcType() string
        GetModel() string
        GetWhere() string
+       ResetOffset(topic string, table map[primitive.MessageQueue]int64)
 }
 
 func DefaultClientOptions() ClientOptions {
@@ -283,6 +284,23 @@ func GetOrNewRocketMQClient(option ClientOptions, 
callbackCh chan interface{}) R
                        }
                        return res
                })
+
+               client.remoteClient.RegisterRequestFunc(ReqResetConsumerOffset, 
func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
+                       rlog.Info("receive reset consumer offset request...", 
map[string]interface{}{
+                               rlog.LogKeyBroker:        addr.String(),
+                               rlog.LogKeyTopic:         
req.ExtFields["topic"],
+                               rlog.LogKeyConsumerGroup: 
req.ExtFields["group"],
+                               rlog.LogKeyTimeStamp:     
req.ExtFields["timestamp"],
+                       })
+                       header := new(ResetOffsetHeader)
+                       header.Decode(req.ExtFields)
+
+                       body := new(ResetOffsetBody)
+                       body.Decode(req.Body)
+
+                       client.resetOffset(header.topic, header.group, 
body.OffsetTable)
+                       return nil
+               })
        }
        return actual.(*rmqClient)
 }
@@ -777,6 +795,15 @@ func (c *rmqClient) isNeedUpdateSubscribeInfo(topic 
string) bool {
        return result
 }
 
+func (c *rmqClient) resetOffset(topic string, group string, offsetTable 
map[primitive.MessageQueue]int64) {
+       consumer, exist := c.consumerMap.Load(group)
+       if !exist {
+               rlog.Warning("group "+group+" do not exists", nil)
+               return
+       }
+       consumer.(InnerConsumer).ResetOffset(topic, offsetTable)
+}
+
 func (c *rmqClient) getConsumerRunningInfo(group string) *ConsumerRunningInfo {
        consumer, exist := c.consumerMap.Load(group)
        if !exist {
diff --git a/internal/model.go b/internal/model.go
index 934c610..d7f2057 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -21,7 +21,9 @@ import (
        "bytes"
        "encoding/json"
        "fmt"
+       "github.com/tidwall/gjson"
        "sort"
+       "strconv"
        "strings"
 
        "github.com/apache/rocketmq-client-go/v2/internal/utils"
@@ -288,3 +290,48 @@ func (result ConsumeMessageDirectlyResult) Encode() 
([]byte, error) {
        }
        return data, nil
 }
+
+type ResetOffsetBody struct {
+       OffsetTable map[primitive.MessageQueue]int64 `json:"offsetTable"`
+}
+
+func (resetOffsetBody *ResetOffsetBody) Decode(body []byte) {
+       result := gjson.ParseBytes(body)
+       rlog.Debug("offset table string "+result.Get("offsetTable").String(), 
nil)
+
+       offsetTable := make(map[primitive.MessageQueue]int64, 0)
+       offsetTableArray := strings.Split(result.Get("offsetTable").String(), 
"],[")
+       for index, v := range offsetTableArray {
+               kvArray := strings.Split(v, "},")
+
+               var kstr, vstr string
+               if index == len(offsetTableArray)-1 {
+                       vstr = kvArray[1][:len(kvArray[1])-2]
+               } else {
+                       vstr = kvArray[1]
+               }
+               offset, err := strconv.ParseInt(vstr, 10, 64)
+               if err != nil {
+                       rlog.Error("Unmarshal offset error", 
map[string]interface{}{
+                               rlog.LogKeyUnderlayError: err,
+                       })
+                       return
+               }
+
+               if index == 0 {
+                       kstr = kvArray[0][2:len(kvArray[0])] + "}"
+               } else {
+                       kstr = kvArray[0] + "}"
+               }
+               kObj := new(primitive.MessageQueue)
+               err = jsoniter.Unmarshal([]byte(kstr), &kObj)
+               if err != nil {
+                       rlog.Error("Unmarshal message queue error", 
map[string]interface{}{
+                               rlog.LogKeyUnderlayError: err,
+                       })
+                       return
+               }
+               offsetTable[*kObj] = offset
+       }
+       resetOffsetBody.OffsetTable = offsetTable
+}
diff --git a/internal/model_test.go b/internal/model_test.go
index 57ff0af..56eeb24 100644
--- a/internal/model_test.go
+++ b/internal/model_test.go
@@ -402,5 +402,21 @@ func TestConsumeMessageDirectlyResult_MarshalJSON(t 
*testing.T) {
                        fmt.Printf("json consumeMessageDirectlyResult: %s\n", 
string(data))
                })
        })
+}
 
+func TestRestOffsetBody_MarshalJSON(t *testing.T) {
+       Convey("test ResetOffset Body Decode", t, func() {
+               body := 
"{\"offsetTable\":[[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":5},23354233],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":4},23354245],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":7},23354203],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":6},23354312],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"q
 [...]
+               resetOffsetBody := new(ResetOffsetBody)
+               resetOffsetBody.Decode([]byte(body))
+               offsetTable := resetOffsetBody.OffsetTable
+               So(offsetTable, ShouldNotBeNil)
+               So(len(offsetTable), ShouldEqual, 8)
+               messageQueue := primitive.MessageQueue{
+                       Topic:      "zx_tst",
+                       BrokerName: "tjwqtst-common-rocketmq-raft0",
+                       QueueId:    5,
+               }
+               So(offsetTable[messageQueue], ShouldEqual, 23354233)
+       })
 }
diff --git a/internal/request.go b/internal/request.go
index 237d711..0e3d8e1 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -48,7 +48,7 @@ const (
        ReqGetAllTopicListFromNameServer = int16(206)
        ReqDeleteTopicInBroker           = int16(215)
        ReqDeleteTopicInNameSrv          = int16(216)
-       ReqResetConsuemrOffset           = int16(220)
+       ReqResetConsumerOffset           = int16(220)
        ReqGetConsumerRunningInfo        = int16(307)
        ReqConsumeMessageDirectly        = int16(309)
 )
@@ -408,6 +408,39 @@ func (request *DeleteTopicRequestHeader) Encode() 
map[string]string {
        return maps
 }
 
+type ResetOffsetHeader struct {
+       topic     string
+       group     string
+       timestamp int64
+       isForce   bool
+}
+
+func (request *ResetOffsetHeader) Encode() map[string]string {
+       maps := make(map[string]string)
+       maps["topic"] = request.topic
+       maps["group"] = request.group
+       maps["timestamp"] = strconv.FormatInt(request.timestamp, 10)
+       return maps
+}
+
+func (request *ResetOffsetHeader) Decode(properties map[string]string) {
+       if len(properties) == 0 {
+               return
+       }
+
+       if v, existed := properties["topic"]; existed {
+               request.topic = v
+       }
+
+       if v, existed := properties["group"]; existed {
+               request.group = v
+       }
+
+       if v, existed := properties["timestamp"]; existed {
+               request.timestamp, _ = strconv.ParseInt(v, 10, 0)
+       }
+}
+
 type ConsumeMessageDirectlyHeader struct {
        consumerGroup string
        clientID      string
diff --git a/rlog/log.go b/rlog/log.go
index 1d850c3..382f5aa 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -33,6 +33,7 @@ const (
        LogKeyValueChangedFrom = "changedFrom"
        LogKeyValueChangedTo   = "changeTo"
        LogKeyPullRequest      = "PullRequest"
+       LogKeyTimeStamp        = "timestamp"
 )
 
 type Logger interface {

Reply via email to