This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new e6de4b4  Add OffsetStore for Consumer (#49)
e6de4b4 is described below

commit e6de4b44f90d52053316c59359b32b3b679babf1
Author: wenfeng <[email protected]>
AuthorDate: Tue Apr 30 22:08:13 2019 +0800

    Add OffsetStore for Consumer (#49)
    
    * add impl of LocalOffsetStoreage
    
    * add impl of RemoteBrokerStore
    
    * fix offset bugs
---
 consumer/consumer.go                               |  36 ++-
 .../producer/main.go => consumer/consumer_test.go  |  32 +--
 consumer/offset_store.go                           | 302 ++++++++++++++++++++-
 consumer/process_queue.go                          |  16 +-
 consumer/push_consumer.go                          |  29 +-
 examples/producer/main.go                          |   2 +-
 kernel/client.go                                   | 140 ++++++----
 kernel/request.go                                  |  59 +++-
 kernel/route.go                                    |   2 +-
 remote/remote_client.go                            |   2 +-
 remote/remote_client_test.go                       |   2 +-
 examples/producer/main.go => utils/errors.go       |  29 +-
 utils/files.go                                     |  65 +++++
 13 files changed, 565 insertions(+), 151 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 60d3dd5..f128f66 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -193,6 +193,14 @@ func (pr *PullRequest) String() string {
 
 type ConsumerOption struct {
        kernel.ClientOption
+       /**
+        * Backtracking consumption time with second precision. Time format is
+        * 20131223171201<br>
+        * Implying Seventeen twelve and 01 seconds on December 23, 2013 
year<br>
+        * Default backtracking consumption time Half an hour ago.
+        */
+       ConsumeTimestamp string
+
        // The socket timeout in milliseconds
        ConsumerPullTimeout time.Duration
 
@@ -549,7 +557,7 @@ func (dc *defaultConsumer) doUnlock(addr string, body 
*lockBatchRequestBody, one
        data, _ := json.Marshal(body)
        request := remote.NewRemotingCommand(kernel.ReqUnlockBatchMQ, nil, data)
        if oneway {
-               err := remote.InvokeOneWay(addr, request)
+               err := remote.InvokeOneWay(addr, request, 3*time.Second)
                if err != nil {
                        rlog.Errorf("lock mq to broker with oneway: %s error 
%s", addr, err.Error())
                }
@@ -581,6 +589,7 @@ func (dc *defaultConsumer) 
buildProcessQueueTableByBrokerName() map[string][]*ke
        return result
 }
 
+// TODO 问题不少 需要再好好对一下
 func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs 
[]*kernel.MessageQueue) bool {
        var changed bool
        mqSet := make(map[*kernel.MessageQueue]bool)
@@ -614,6 +623,10 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic 
string, mqs []*kernel.M
 
        if dc.cType == _PushConsume {
                for mq := range mqSet {
+                       _, exist := dc.processQueueTable.Load(mq)
+                       if exist {
+                               continue
+                       }
                        if dc.consumeOrderly && !dc.lock(mq) {
                                rlog.Warnf("do defaultConsumer, Group:%s add a 
new mq failed, %s, because lock failed",
                                        dc.consumerGroup, mq.String())
@@ -669,13 +682,17 @@ func (dc *defaultConsumer) computePullFromWhere(mq 
*kernel.MessageQueue) int64 {
                case ConsumeFromLastOffset:
                        if lastOffset == -1 {
                                if strings.HasPrefix(mq.Topic, 
kernel.RetryGroupTopicPrefix) {
-                                       lastOffset, err := 
kernel.QueryMaxOffset(mq.Topic, mq.QueueId)
+                                       lastOffset = 0
+                               } else {
+                                       lastOffset, err := 
kernel.QueryMaxOffset(mq)
                                        if err == nil {
                                                result = lastOffset
                                        } else {
                                                rlog.Warnf("query max offset 
of: [%s:%d] error, %s", mq.Topic, mq.QueueId, err.Error())
                                        }
                                }
+                       } else {
+                               result = -1
                        }
                case ConsumeFromFirstOffset:
                        if lastOffset == -1 {
@@ -684,14 +701,25 @@ func (dc *defaultConsumer) computePullFromWhere(mq 
*kernel.MessageQueue) int64 {
                case ConsumeFromTimestamp:
                        if lastOffset == -1 {
                                if strings.HasPrefix(mq.Topic, 
kernel.RetryGroupTopicPrefix) {
-                                       lastOffset, err := 
kernel.QueryMaxOffset(mq.Topic, mq.QueueId)
+                                       lastOffset, err := 
kernel.QueryMaxOffset(mq)
                                        if err == nil {
                                                result = lastOffset
                                        } else {
+                                               result = -1
                                                rlog.Warnf("query max offset 
of: [%s:%d] error, %s", mq.Topic, mq.QueueId, err.Error())
                                        }
                                } else {
-                                       // TODO parse timestamp
+                                       t, err := time.Parse("20060102150405", 
dc.option.ConsumeTimestamp)
+                                       if err != nil {
+                                               result = -1
+                                       } else {
+                                               lastOffset, err := 
kernel.SearchOffsetByTimestamp(mq, t.Unix())
+                                               if err != nil {
+                                                       result = -1
+                                               } else {
+                                                       result = lastOffset
+                                               }
+                                       }
                                }
                        }
                default:
diff --git a/examples/producer/main.go b/consumer/consumer_test.go
similarity index 53%
copy from examples/producer/main.go
copy to consumer/consumer_test.go
index 5c12584..2eb3f24 100644
--- a/examples/producer/main.go
+++ b/consumer/consumer_test.go
@@ -15,33 +15,17 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
-package main
+package consumer
 
 import (
-       "fmt"
-       "github.com/apache/rocketmq-client-go/consumer"
-       "github.com/apache/rocketmq-client-go/kernel"
-       "os"
+       "github.com/stretchr/testify/assert"
+       "testing"
        "time"
 )
 
-func main() {
-       c := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
-               ConsumerModel: consumer.Clustering,
-               FromWhere:     consumer.ConsumeFromFirstOffset,
-       })
-       err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx 
*consumer.ConsumeMessageContext,
-               msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
-               fmt.Println(msgs)
-               return consumer.ConsumeSuccess, nil
-       })
-       if err != nil {
-               fmt.Println(err.Error())
-       }
-       err = c.Start()
-       if err != nil {
-               fmt.Println(err.Error())
-               os.Exit(-1)
-       }
-       time.Sleep(time.Hour)
+func TestParseTimestamp(t *testing.T) {
+       layout := "20060102150405"
+       timestamp, err := time.ParseInLocation(layout, "20190430193409", 
time.Local)
+       assert.Nil(t, err)
+       assert.Equal(t, int64(1556624049), timestamp.Unix())
 }
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index a11280d..6b6d719 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -17,7 +17,19 @@ limitations under the License.
 
 package consumer
 
-import "github.com/apache/rocketmq-client-go/kernel"
+import (
+       "encoding/json"
+       "fmt"
+       "github.com/apache/rocketmq-client-go/kernel"
+       "github.com/apache/rocketmq-client-go/remote"
+       "github.com/apache/rocketmq-client-go/rlog"
+       "github.com/apache/rocketmq-client-go/utils"
+       "os"
+       "path/filepath"
+       "strconv"
+       "sync"
+       "time"
+)
 
 type readType int
 
@@ -27,6 +39,16 @@ const (
        _ReadMemoryThenStore
 )
 
+var (
+       _LocalOffsetStorePath = os.Getenv("rocketmq.client.localOffsetStoreDir")
+)
+
+func init() {
+       if _LocalOffsetStorePath == "" {
+               _LocalOffsetStorePath = filepath.Join(os.Getenv("user.home"), 
".rocketmq_client_go")
+       }
+}
+
 type OffsetStore interface {
        load()
        persist(mqs []*kernel.MessageQueue)
@@ -36,20 +58,280 @@ type OffsetStore interface {
 }
 
 type localFileOffsetStore struct {
+       group       string
+       path        string
+       OffsetTable map[string]map[int]*queueOffset `json:"OffsetTable"`
+       // mutex for offset file
+       mutex sync.Mutex
 }
 
-func (local *localFileOffsetStore) load()                                      
                     {}
-func (local *localFileOffsetStore) persist(mqs []*kernel.MessageQueue)         
                     {}
-func (local *localFileOffsetStore) remove(mq *kernel.MessageQueue)             
                     {}
-func (local *localFileOffsetStore) read(mq *kernel.MessageQueue, t readType) 
int64                  { return 0 }
-func (local *localFileOffsetStore) update(mq *kernel.MessageQueue, offset 
int64, increaseOnly bool) {}
+type queueOffset struct {
+       QueueID int    `json:"queueId"`
+       Broker  string `json:"brokerName"`
+       Offset  int64  `json:"offset"`
+}
+
+func NewLocalFileOffsetStore(clientID, group string) OffsetStore {
+       store := &localFileOffsetStore{
+               group: group,
+               path:  filepath.Join(_LocalOffsetStorePath, clientID, group, 
"offset.json"),
+       }
+       store.load()
+       return store
+}
+
+func (local *localFileOffsetStore) load() {
+       local.mutex.Lock()
+       defer local.mutex.Unlock()
+       data, err := utils.FileReadAll(local.path)
+       if err != nil {
+               data, err = utils.FileReadAll(filepath.Join(local.path, ".bak"))
+       }
+       if err != nil {
+               rlog.Debugf("load local offset: %s error: %s", local.path, 
err.Error())
+               return
+       }
+       err = json.Unmarshal(data, local)
+       if err != nil {
+               rlog.Debugf("unmarshal local offset: %s error: %s", local.path, 
err.Error())
+               return
+       }
+}
+
+func (local *localFileOffsetStore) read(mq *kernel.MessageQueue, t readType) 
int64 {
+       if t == _ReadFromMemory || t == _ReadMemoryThenStore {
+               off := readFromMemory(local.OffsetTable, mq)
+               if off >= 0 || (off == -1 && t == _ReadFromMemory) {
+                       return off
+               }
+       }
+       local.load()
+       return readFromMemory(local.OffsetTable, mq)
+}
+
+func (local *localFileOffsetStore) update(mq *kernel.MessageQueue, offset 
int64, increaseOnly bool) {
+       rlog.Infof("update offset: %s to %d", mq, offset)
+       localOffset, exist := local.OffsetTable[mq.Topic]
+       if !exist {
+               localOffset = make(map[int]*queueOffset)
+               local.OffsetTable[mq.Topic] = localOffset
+       }
+       q, exist := localOffset[mq.QueueId]
+       if !exist {
+               q = &queueOffset{
+                       QueueID: mq.QueueId,
+                       Broker:  mq.BrokerName,
+               }
+               localOffset[mq.QueueId] = q
+       }
+       if increaseOnly {
+               if q.Offset < offset {
+                       q.Offset = offset
+               }
+       } else {
+               q.Offset = offset
+       }
+}
+
+func (local *localFileOffsetStore) persist(mqs []*kernel.MessageQueue) {
+       if len(mqs) == 0 {
+               return
+       }
+       s := new(struct {
+               OffsetTable map[string]map[int]*queueOffset `json:"offsetTable"`
+       })
+       table := make(map[string]map[int]*queueOffset)
+       for idx := range mqs {
+               mq := mqs[idx]
+               offsets, exist := local.OffsetTable[mq.Topic]
+               if !exist {
+                       continue
+               }
+               off, exist := offsets[mq.QueueId]
+               if !exist {
+                       continue
+               }
+
+               offsets, exist = table[mq.Topic]
+               if !exist {
+                       offsets = make(map[int]*queueOffset)
+               }
+               offsets[off.QueueID] = off
+       }
+       data, _ := json.Marshal(s)
+       utils.CheckError(fmt.Sprintf("persist offset to %s", local.path), 
utils.WriteToFile(local.path, data))
+}
+
+func (local *localFileOffsetStore) remove(mq *kernel.MessageQueue) {
+       // unsupported
+}
 
 type remoteBrokerOffsetStore struct {
+       group       string
+       OffsetTable map[string]map[int]*queueOffset `json:"OffsetTable"`
+       mutex       sync.RWMutex
+}
+
+func NewRemoteOffsetStore(group string) OffsetStore {
+       return &remoteBrokerOffsetStore{
+               group:       group,
+               OffsetTable: make(map[string]map[int]*queueOffset),
+       }
+}
+
+func (remote *remoteBrokerOffsetStore) load() {
+       // unsupported
+}
+
+func (remote *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue) {
+       remote.mutex.Lock()
+       defer remote.mutex.Unlock()
+       if len(mqs) == 0 {
+               return
+       }
+       for idx := range mqs {
+               mq := mqs[idx]
+               offsets, exist := remote.OffsetTable[mq.Topic]
+               if !exist {
+                       continue
+               }
+               off, exist := offsets[mq.QueueId]
+               if !exist {
+                       continue
+               }
+
+               err := updateConsumeOffsetToBroker(remote.group, mq.Topic, off)
+               if err != nil {
+                       rlog.Warnf("update offset to broker error: %s, group: 
%s, queue: %s, offset: %d",
+                               err.Error(), remote.group, mq.String(), 
off.Offset)
+               } else {
+                       rlog.Infof("update offset to broker success, group: %s, 
topic: %s, queue: %v", remote.group, mq.Topic, off)
+               }
+       }
+}
+
+func (remote *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue) {
+       remote.mutex.Lock()
+       defer remote.mutex.Unlock()
+       if mq == nil {
+               return
+       }
+       offset, exist := remote.OffsetTable[mq.Topic]
+       if !exist {
+               return
+       }
+       rlog.Infof("delete: %s", mq.String())
+       delete(offset, mq.QueueId)
+}
+
+func (remote *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t 
readType) int64 {
+       remote.mutex.RLock()
+       if t == _ReadFromMemory || t == _ReadMemoryThenStore {
+               off := readFromMemory(remote.OffsetTable, mq)
+               if off >= 0 || (off == -1 && t == _ReadFromMemory) {
+                       remote.mutex.RUnlock()
+                       return off
+               }
+       }
+       off, err := fetchConsumeOffsetFromBroker(remote.group, mq)
+       if err != nil {
+               rlog.Errorf("fetch offset of %s error: %s", mq.String(), 
err.Error())
+               remote.mutex.RUnlock()
+               return -1
+       }
+       remote.mutex.RUnlock()
+       remote.update(mq, off, true)
+       return off
 }
 
-func (remote *remoteBrokerOffsetStore) load()                                  
        {}
-func (remote *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue)     
        {}
-func (remote *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue)         
        {}
-func (remote *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t 
readType) int64 { return 0 }
 func (remote *remoteBrokerOffsetStore) update(mq *kernel.MessageQueue, offset 
int64, increaseOnly bool) {
+       rlog.Infof("update offset: %s to %d", mq, offset)
+       remote.mutex.Lock()
+       defer remote.mutex.Unlock()
+       localOffset, exist := remote.OffsetTable[mq.Topic]
+       if !exist {
+               localOffset = make(map[int]*queueOffset)
+               remote.OffsetTable[mq.Topic] = localOffset
+       }
+       q, exist := localOffset[mq.QueueId]
+       if !exist {
+               rlog.Infof("new queueOffset: %d, off: %d", mq.QueueId, offset)
+               q = &queueOffset{
+                       QueueID: mq.QueueId,
+                       Broker:  mq.BrokerName,
+               }
+               localOffset[mq.QueueId] = q
+       }
+       if increaseOnly {
+               if q.Offset < offset {
+                       q.Offset = offset
+               }
+       } else {
+               q.Offset = offset
+       }
+}
+
+func readFromMemory(table map[string]map[int]*queueOffset, mq 
*kernel.MessageQueue) int64 {
+       localOffset, exist := table[mq.Topic]
+       if !exist {
+               return -1
+       }
+       off, exist := localOffset[mq.QueueId]
+       if !exist {
+               return -1
+       }
+
+       return off.Offset
+}
+
+func fetchConsumeOffsetFromBroker(group string, mq *kernel.MessageQueue) 
(int64, error) {
+       broker := kernel.FindBrokerAddrByName(mq.BrokerName)
+       if broker == "" {
+               kernel.UpdateTopicRouteInfo(mq.Topic)
+               broker = kernel.FindBrokerAddrByName(mq.BrokerName)
+       }
+       if broker == "" {
+               return int64(-1), fmt.Errorf("broker: %s address not found", 
mq.BrokerName)
+       }
+       queryOffsetRequest := &kernel.QueryConsumerOffsetRequest{
+               ConsumerGroup: group,
+               Topic:         mq.Topic,
+               QueueId:       mq.QueueId,
+       }
+       cmd := remote.NewRemotingCommand(kernel.ReqQueryConsumerOffset, 
queryOffsetRequest, nil)
+       res, err := remote.InvokeSync(broker, cmd, 3*time.Second)
+       if err != nil {
+               return -1, err
+       }
+       if res.Code != kernel.ResSuccess {
+               return -2, fmt.Errorf("broker response code: %d, remarks: %s", 
res.Code, res.Remark)
+       }
+
+       off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)
+
+       if err != nil {
+               return -1, err
+       }
+
+       return off, nil
+}
+
+func updateConsumeOffsetToBroker(group, topic string, queue *queueOffset) 
error {
+       broker := kernel.FindBrokerAddrByName(queue.Broker)
+       if broker == "" {
+               kernel.UpdateTopicRouteInfo(topic)
+               broker = kernel.FindBrokerAddrByName(queue.Broker)
+       }
+       if broker == "" {
+               return fmt.Errorf("broker: %s address not found", queue.Broker)
+       }
+
+       updateOffsetRequest := &kernel.UpdateConsumerOffsetRequest{
+               ConsumerGroup: group,
+               Topic:         topic,
+               QueueId:       queue.QueueID,
+               CommitOffset:  queue.Offset,
+       }
+       cmd := remote.NewRemotingCommand(kernel.ReqUpdateConsumerOffset, 
updateOffsetRequest, nil)
+       return remote.InvokeOneWay(broker, cmd, 5*time.Second)
 }
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 0db0766..f4367f9 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -64,20 +64,26 @@ func (pq *ProcessQueue) putMessage(messages 
[]*kernel.MessageExt) {
        localList := list.New()
        for idx := range messages {
                localList.PushBack(messages[idx])
+               pq.queueOffsetMax = messages[idx].QueueOffset
        }
        pq.mutex.Lock()
        pq.msgCache.PushBackList(localList)
        pq.mutex.Unlock()
 }
 
-func (pq *ProcessQueue) removeMessage(number int) int {
-       i := 0
+func (pq *ProcessQueue) removeMessage(number int) int64 {
+       result := pq.queueOffsetMax + 1
        pq.mutex.Lock()
-       for ; i < number && pq.msgCache.Len() > 0; i++ {
-               pq.msgCache.Remove(pq.msgCache.Front())
+       for i := 0; i < number && pq.msgCache.Len() > 0; i++ {
+               head := pq.msgCache.Front()
+               pq.msgCache.Remove(head)
+               result = head.Value.(*kernel.MessageExt).QueueOffset
        }
        pq.mutex.Unlock()
-       return i
+       if pq.msgCache.Len() > 0 {
+               result = 
pq.msgCache.Front().Value.(*kernel.MessageExt).QueueOffset
+       }
+       return result
 }
 
 func (pq *ProcessQueue) takeMessages(number int) []*kernel.MessageExt {
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 58f4c68..fc78e3e 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -20,7 +20,6 @@ package consumer
 import (
        "context"
        "errors"
-       "fmt"
        "github.com/apache/rocketmq-client-go/kernel"
        "github.com/apache/rocketmq-client-go/rlog"
        "math"
@@ -53,13 +52,6 @@ type PushConsumer interface {
 
 type pushConsumer struct {
        *defaultConsumer
-       /**
-        * Backtracking consumption time with second precision. Time format is
-        * 20131223171201<br>
-        * Implying Seventeen twelve and 01 seconds on December 23, 2013 
year<br>
-        * Default backtracking consumption time Half an hour ago.
-        */
-       ConsumeTimestamp             time.Duration
        queueFlowControlTimes        int
        queueMaxSpanFlowControlTimes int
        consume                      func(*ConsumeMessageContext, 
[]*kernel.MessageExt) (ConsumeResult, error)
@@ -97,9 +89,8 @@ func NewPushConsumer(consumerGroup string, opt 
ConsumerOption) PushConsumer {
        }
 
        p := &pushConsumer{
-               defaultConsumer:  dc,
-               ConsumeTimestamp: 30 * time.Minute,
-               subscribedTopic:  make(map[string]string, 0),
+               defaultConsumer: dc,
+               subscribedTopic: make(map[string]string, 0),
        }
        dc.mqChanged = p.messageQueueChanged
        if p.consumeOrderly {
@@ -118,8 +109,8 @@ func (pc *pushConsumer) Start() error {
                pc.state = kernel.StateStartFailed
                pc.validate()
 
-               // set retry topic
                if pc.model == Clustering {
+                       // set retry topic
                        retryTopic := kernel.GetRetryTopic(pc.consumerGroup)
                        pc.subscriptionDataTable.Store(retryTopic, 
buildSubscriptionData(retryTopic,
                                MessageSelector{TAG, _SubAll}))
@@ -128,9 +119,9 @@ func (pc *pushConsumer) Start() error {
                pc.client = 
kernel.GetOrNewRocketMQClient(pc.option.ClientOption)
                if pc.model == Clustering {
                        pc.option.ChangeInstanceNameToPID()
-                       pc.storage = &remoteBrokerOffsetStore{}
+                       pc.storage = NewRemoteOffsetStore(pc.consumerGroup)
                } else {
-                       pc.storage = &localFileOffsetStore{}
+                       pc.storage = NewLocalFileOffsetStore(pc.consumerGroup, 
pc.client.ClientID())
                }
                pc.storage.load()
                go func() {
@@ -139,7 +130,6 @@ func (pc *pushConsumer) Start() error {
                        for {
                                pr := <-pc.prCh
                                go func() {
-                                       fmt.Println(pr.String())
                                        pc.pullMessage(&pr)
                                }()
                        }
@@ -417,7 +407,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                        QueueOffset:    request.nextOffset,
                        MaxMsgNums:     pc.option.PullBatchSize,
                        SysFlag:        sysFlag,
-                       CommitOffset:   0,
+                       CommitOffset:   commitOffsetValue,
                        SubExpression:  _SubAll,
                        ExpressionType: string(TAG), // TODO
                }
@@ -428,7 +418,6 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                //      pullRequest.SubVersion = data.SubVersion
                //}
 
-               //ch := make(chan *kernel.PullResult)
                brokerResult := tryFindBroker(request.mq)
                if brokerResult == nil {
                        rlog.Warnf("no broker found for %s", 
request.mq.String())
@@ -437,13 +426,13 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) 
{
                }
                result, err := pc.client.PullMessage(context.Background(), 
brokerResult.BrokerAddr, pullRequest)
                if err != nil {
-                       rlog.Warnf("pull message from %s error: %s", 
"127.0.0.1:10911", err.Error())
+                       rlog.Warnf("pull message from %s error: %s", 
brokerResult.BrokerAddr, err.Error())
                        sleepTime = _PullDelayTimeWhenError
                        goto NEXT
                }
 
                if result.Status == kernel.PullBrokerTimeout {
-                       rlog.Warnf("pull broker: %s timeout", "127.0.0.1:10911")
+                       rlog.Warnf("pull broker: %s timeout", 
brokerResult.BrokerAddr)
                        sleepTime = _PullDelayTimeWhenError
                        goto NEXT
                }
@@ -486,7 +475,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                                rlog.Warnf("fix the pull request offset: %s", 
request.String())
                        }()
                default:
-                       rlog.Warnf("")
+                       rlog.Warnf("unknown pull status: %v", result.Status)
                        sleepTime = _PullDelayTimeWhenError
                }
        }
diff --git a/examples/producer/main.go b/examples/producer/main.go
index 5c12584..e2d0126 100644
--- a/examples/producer/main.go
+++ b/examples/producer/main.go
@@ -30,7 +30,7 @@ func main() {
                ConsumerModel: consumer.Clustering,
                FromWhere:     consumer.ConsumeFromFirstOffset,
        })
-       err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx 
*consumer.ConsumeMessageContext,
+       err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx 
*consumer.ConsumeMessageContext,
                msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
                fmt.Println(msgs)
                return consumer.ConsumeSuccess, nil
diff --git a/kernel/client.go b/kernel/client.go
index e83d598..8403ccb 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -101,63 +101,66 @@ type RMQClient struct {
 
        // group -> InnerConsumer
        consumerMap sync.Map
+       once        sync.Once
 }
 
 var clientMap sync.Map
 
 func GetOrNewRocketMQClient(option ClientOption) *RMQClient {
-       // TODO
-       return &RMQClient{option: option}
+       client := &RMQClient{option: option}
+       actual, _ := clientMap.LoadOrStore(client.ClientID(), client)
+       return actual.(*RMQClient)
 }
 
 func (c *RMQClient) Start() {
-       // TODO fetchNameServerAddr
-       go func() {}()
-
-       // schedule update route info
-       go func() {
-               // delay
-               time.Sleep(50 * time.Millisecond)
-               for {
-                       c.UpdateTopicRouteInfo()
-                       time.Sleep(_PullNameServerInterval)
-               }
-       }()
-
-       // TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
-       go func() {}()
-
-       // schedule persist offset
-       go func() {
-               time.Sleep(10 * time.Second)
-               for {
-                       c.consumerMap.Range(func(key, value interface{}) bool {
-                               consumer := value.(InnerConsumer)
-                               consumer.PersistConsumerOffset()
-                               return true
-                       })
-                       time.Sleep(_PersistOffset)
-               }
-       }()
+       c.once.Do(func() {
+               // TODO fetchNameServerAddr
+               go func() {}()
+
+               // schedule update route info
+               go func() {
+                       // delay
+                       time.Sleep(50 * time.Millisecond)
+                       for {
+                               c.UpdateTopicRouteInfo()
+                               time.Sleep(_PullNameServerInterval)
+                       }
+               }()
+
+               // TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
+               go func() {}()
+
+               // schedule persist offset
+               go func() {
+                       time.Sleep(10 * time.Second)
+                       for {
+                               c.consumerMap.Range(func(key, value 
interface{}) bool {
+                                       consumer := value.(InnerConsumer)
+                                       consumer.PersistConsumerOffset()
+                                       return true
+                               })
+                               time.Sleep(_PersistOffset)
+                       }
+               }()
 
-       go func() {
-               for {
-                       c.RebalanceImmediately()
-                       time.Sleep(time.Second)
-               }
-       }()
+               go func() {
+                       for {
+                               c.RebalanceImmediately()
+                               time.Sleep(time.Second)
+                       }
+               }()
+       })
 }
 
 func (c *RMQClient) ClientID() string {
-       //id := c.option.ClientIP + "@" + c.option.InstanceName
-       //if c.option.UnitName != "" {
-       //      id += "@" + c.option.UnitName
-       //}
-       return "127.0.0.1:10911@DEFAULT"
+       id := c.option.ClientIP + "@" + c.option.InstanceName
+       if c.option.UnitName != "" {
+               id += "@" + c.option.UnitName
+       }
+       return id
 }
 
 func (c *RMQClient) CheckClientInBroker() {
-
 }
 
 func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
@@ -269,7 +272,7 @@ func (c *RMQClient) SendMessageAsync(ctx context.Context, 
brokerAddrs, brokerNam
 func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, 
request *SendMessageRequest,
        msgs []*Message) (*SendResult, error) {
        cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, 
encodeMessages(msgs))
-       err := remote.InvokeOneWay(brokerAddrs, cmd)
+       err := remote.InvokeOneWay(brokerAddrs, cmd, 3*time.Second)
        if err != nil {
                rlog.Warnf("send messages with oneway error: %v", err)
        }
@@ -379,18 +382,59 @@ func (c *RMQClient) PullMessageAsync(ctx context.Context, 
brokerAddrs string, re
 }
 
 // QueryMaxOffset with specific queueId and topic
-func QueryMaxOffset(topic string, queueId int) (int64, error) {
-       return 0, nil
+func QueryMaxOffset(mq *MessageQueue) (int64, error) {
+       brokerAddr := FindBrokerAddrByName(mq.BrokerName)
+       if brokerAddr == "" {
+               UpdateTopicRouteInfo(mq.Topic)
+               brokerAddr = FindBrokerAddrByName(mq.Topic)
+       }
+       if brokerAddr == "" {
+               return -1, fmt.Errorf("the broker [%s] does not exist", 
mq.BrokerName)
+       }
+
+       request := &GetMaxOffsetRequest{
+               Topic:   mq.Topic,
+               QueueId: mq.QueueId,
+       }
+
+       cmd := remote.NewRemotingCommand(ReqGetMaxOffset, request, nil)
+       response, err := remote.InvokeSync(brokerAddr, cmd, 3*time.Second)
+       if err != nil {
+               return -1, err
+       }
+
+       return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
 }
 
 // QueryConsumerOffset with specific queueId and topic of consumerGroup
-func (c *RMQClient) QueryConsumerOffset(consumerGroup, topic string, queue 
int) (int64, error) {
+func (c *RMQClient) QueryConsumerOffset(consumerGroup, mq *MessageQueue) 
(int64, error) {
        return 0, nil
 }
 
 // SearchOffsetByTimestamp with specific queueId and topic
-func (c *RMQClient) SearchOffsetByTimestamp(topic string, queue int, timestamp 
int64) (int64, error) {
-       return 0, nil
+func SearchOffsetByTimestamp(mq *MessageQueue, timestamp int64) (int64, error) 
{
+       brokerAddr := FindBrokerAddrByName(mq.BrokerName)
+       if brokerAddr == "" {
+               UpdateTopicRouteInfo(mq.Topic)
+               brokerAddr = FindBrokerAddrByName(mq.Topic)
+       }
+       if brokerAddr == "" {
+               return -1, fmt.Errorf("the broker [%s] does not exist", 
mq.BrokerName)
+       }
+
+       request := &SearchOffsetRequest{
+               Topic:     mq.Topic,
+               QueueId:   mq.QueueId,
+               Timestamp: timestamp,
+       }
+
+       cmd := remote.NewRemotingCommand(ReqSearchOffsetByTimestamp, request, 
nil)
+       response, err := remote.InvokeSync(brokerAddr, cmd, 3*time.Second)
+       if err != nil {
+               return -1, err
+       }
+
+       return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
 }
 
 // UpdateConsumerOffset with specific queueId and topic
diff --git a/kernel/request.go b/kernel/request.go
index 2647ad5..36770ec 100644
--- a/kernel/request.go
+++ b/kernel/request.go
@@ -19,17 +19,22 @@ package kernel
 
 import (
        "fmt"
+       "strconv"
        "time"
 )
 
 const (
-       ReqPullMessage            = int16(11)
-       ReqHeartBeat              = int16(34)
-       ReqGetConsumerListByGroup = int16(38)
-       ReqLockBatchMQ            = int16(41)
-       ReqUnlockBatchMQ          = int16(42)
-       ReqGetRouteInfoByTopic    = int16(105)
-       ReqSendBatchMessage       = int16(320)
+       ReqPullMessage             = int16(11)
+       ReqQueryConsumerOffset     = int16(14)
+       ReqUpdateConsumerOffset    = int16(15)
+       ReqSearchOffsetByTimestamp = int16(30)
+       ReqGetMaxOffset            = int16(30)
+       ReqHeartBeat               = int16(34)
+       ReqGetConsumerListByGroup  = int16(38)
+       ReqLockBatchMQ             = int16(41)
+       ReqUnlockBatchMQ           = int16(42)
+       ReqGetRouteInfoByTopic     = int16(105)
+       ReqSendBatchMessage        = int16(320)
 )
 
 type SendMessageRequest struct {
@@ -97,28 +102,60 @@ func (request *GetConsumerList) Encode() map[string]string 
{
 
 type GetMaxOffsetRequest struct {
        Topic   string `json:"topic"`
-       QueueId int32  `json:"queueId"`
+       QueueId int    `json:"queueId"`
+}
+
+func (request *GetMaxOffsetRequest) Encode() map[string]string {
+       maps := make(map[string]string)
+       maps["topic"] = request.Topic
+       maps["queueId"] = strconv.Itoa(request.QueueId)
+       return maps
 }
 
 type QueryConsumerOffsetRequest struct {
        ConsumerGroup string `json:"consumerGroup"`
        Topic         string `json:"topic"`
-       QueueId       int32  `json:"queueId"`
+       QueueId       int    `json:"queueId"`
+}
+
+func (request *QueryConsumerOffsetRequest) Encode() map[string]string {
+       maps := make(map[string]string)
+       maps["consumerGroup"] = request.ConsumerGroup
+       maps["topic"] = request.Topic
+       maps["queueId"] = strconv.Itoa(request.QueueId)
+       return maps
 }
 
 type SearchOffsetRequest struct {
        Topic     string `json:"topic"`
-       QueueId   int32  `json:"queueId"`
+       QueueId   int    `json:"queueId"`
        Timestamp int64  `json:"timestamp"`
 }
 
+func (request *SearchOffsetRequest) Encode() map[string]string {
+       maps := make(map[string]string)
+       maps["Topic"] = request.Topic
+       maps["QueueId"] = strconv.Itoa(request.QueueId)
+       maps["timestamp"] = strconv.FormatInt(request.Timestamp, 10)
+       return maps
+}
+
 type UpdateConsumerOffsetRequest struct {
        ConsumerGroup string `json:"consumerGroup"`
        Topic         string `json:"topic"`
-       QueueId       int32  `json:"queueId"`
+       QueueId       int    `json:"queueId"`
        CommitOffset  int64  `json:"commitOffset"`
 }
 
+func (request *UpdateConsumerOffsetRequest) Encode() map[string]string {
+       maps := make(map[string]string)
+       maps["consumerGroup"] = request.ConsumerGroup
+       maps["topic"] = request.Topic
+       maps["queueId"] = strconv.Itoa(request.QueueId)
+       maps["commitOffset"] = strconv.FormatInt(request.CommitOffset, 10)
+       return maps
+}
+
 type GetRouteInfoRequest struct {
        Topic string `json:"topic"`
 }
diff --git a/kernel/route.go b/kernel/route.go
index f8cd884..4ff4f99 100644
--- a/kernel/route.go
+++ b/kernel/route.go
@@ -138,7 +138,7 @@ func FindBrokerAddrByTopic(topic string) string {
        return addr
 }
 
-func FindBrokerAddressInPublish(brokerName string) string {
+func FindBrokerAddrByName(brokerName string) string {
        bd, exist := brokerAddressesMap.Load(brokerName)
 
        if !exist {
diff --git a/remote/remote_client.go b/remote/remote_client.go
index 9f5839a..bfe483c 100644
--- a/remote/remote_client.go
+++ b/remote/remote_client.go
@@ -116,7 +116,7 @@ func InvokeAsync(addr string, request *RemotingCommand, 
timeoutMillis time.Durat
 
 }
 
-func InvokeOneWay(addr string, request *RemotingCommand) error {
+func InvokeOneWay(addr string, request *RemotingCommand, timeout 
time.Duration) error {
        conn, err := connect(addr)
        if err != nil {
                return err
diff --git a/remote/remote_client_test.go b/remote/remote_client_test.go
index 0a05953..acd0d26 100644
--- a/remote/remote_client_test.go
+++ b/remote/remote_client_test.go
@@ -261,7 +261,7 @@ func TestInvokeOneWay(t *testing.T) {
        var wg sync.WaitGroup
        wg.Add(1)
        go func() {
-               err := InvokeOneWay(":3000", clientSendRemtingCommand)
+               err := InvokeOneWay(":3000", clientSendRemtingCommand, 
3*time.Second)
                if err != nil {
                        t.Fatalf("failed to invoke synchronous. %s", err)
                }
diff --git a/examples/producer/main.go b/utils/errors.go
similarity index 53%
copy from examples/producer/main.go
copy to utils/errors.go
index 5c12584..0d96d97 100644
--- a/examples/producer/main.go
+++ b/utils/errors.go
@@ -15,33 +15,12 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
-package main
+package utils
 
-import (
-       "fmt"
-       "github.com/apache/rocketmq-client-go/consumer"
-       "github.com/apache/rocketmq-client-go/kernel"
-       "os"
-       "time"
-)
+import "github.com/apache/rocketmq-client-go/rlog"
 
-func main() {
-       c := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
-               ConsumerModel: consumer.Clustering,
-               FromWhere:     consumer.ConsumeFromFirstOffset,
-       })
-       err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx 
*consumer.ConsumeMessageContext,
-               msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
-               fmt.Println(msgs)
-               return consumer.ConsumeSuccess, nil
-       })
+func CheckError(action string, err error) {
        if err != nil {
-               fmt.Println(err.Error())
+               rlog.Errorf("%s error: %s", action, err.Error())
        }
-       err = c.Start()
-       if err != nil {
-               fmt.Println(err.Error())
-               os.Exit(-1)
-       }
-       time.Sleep(time.Hour)
 }
diff --git a/utils/files.go b/utils/files.go
new file mode 100644
index 0000000..b1c2c36
--- /dev/null
+++ b/utils/files.go
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+       "fmt"
+       "os"
+       "path/filepath"
+)
+
+func FileReadAll(path string) ([]byte, error) {
+       stat, err := os.Stat(path)
+       if err != nil {
+               return nil, err
+       }
+       file, err := os.Open(path)
+       if err != nil {
+               return nil, err
+       }
+       data := make([]byte, stat.Size())
+       _, err = file.Read(data)
+       if err != nil {
+               return nil, err
+       }
+       return data, nil
+}
+
+func WriteToFile(path string, data []byte) error {
+       tmpFile, err := os.Create(filepath.Join(path, ".tmp"))
+       if err != nil {
+               return err
+       }
+       _, err = tmpFile.Write(data)
+       if err != nil {
+               return err
+       }
+       CheckError(fmt.Sprintf("close %s", tmpFile.Name()), tmpFile.Close())
+
+       prevContent, err := FileReadAll(path)
+       if err == nil {
+               bakFile, err := os.Create(filepath.Join(path, ".bak"))
+               _, err = bakFile.Write(prevContent)
+               if err != nil {
+                       return err
+               }
+               CheckError(fmt.Sprintf("close %s", bakFile.Name()), 
bakFile.Close())
+       }
+       CheckError(fmt.Sprintf("remove %s", path), os.Remove(path))
+       return os.Rename(filepath.Join(path, ".tmp"), path)
+}

Reply via email to