This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new e6de4b4 Add OffsetStore for Consumer (#49)
e6de4b4 is described below
commit e6de4b44f90d52053316c59359b32b3b679babf1
Author: wenfeng <[email protected]>
AuthorDate: Tue Apr 30 22:08:13 2019 +0800
Add OffsetStore for Consumer (#49)
* add impl of LocalOffsetStoreage
* add impl of RemoteBrokerStore
* fix offset bugs
---
consumer/consumer.go | 36 ++-
.../producer/main.go => consumer/consumer_test.go | 32 +--
consumer/offset_store.go | 302 ++++++++++++++++++++-
consumer/process_queue.go | 16 +-
consumer/push_consumer.go | 29 +-
examples/producer/main.go | 2 +-
kernel/client.go | 140 ++++++----
kernel/request.go | 59 +++-
kernel/route.go | 2 +-
remote/remote_client.go | 2 +-
remote/remote_client_test.go | 2 +-
examples/producer/main.go => utils/errors.go | 29 +-
utils/files.go | 65 +++++
13 files changed, 565 insertions(+), 151 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 60d3dd5..f128f66 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -193,6 +193,14 @@ func (pr *PullRequest) String() string {
type ConsumerOption struct {
kernel.ClientOption
+ /**
+ * Backtracking consumption time with second precision. Time format is
+ * 20131223171201<br>
+ * Implying Seventeen twelve and 01 seconds on December 23, 2013
year<br>
+ * Default backtracking consumption time Half an hour ago.
+ */
+ ConsumeTimestamp string
+
// The socket timeout in milliseconds
ConsumerPullTimeout time.Duration
@@ -549,7 +557,7 @@ func (dc *defaultConsumer) doUnlock(addr string, body
*lockBatchRequestBody, one
data, _ := json.Marshal(body)
request := remote.NewRemotingCommand(kernel.ReqUnlockBatchMQ, nil, data)
if oneway {
- err := remote.InvokeOneWay(addr, request)
+ err := remote.InvokeOneWay(addr, request, 3*time.Second)
if err != nil {
rlog.Errorf("lock mq to broker with oneway: %s error
%s", addr, err.Error())
}
@@ -581,6 +589,7 @@ func (dc *defaultConsumer)
buildProcessQueueTableByBrokerName() map[string][]*ke
return result
}
+// TODO 问题不少 需要再好好对一下
func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs
[]*kernel.MessageQueue) bool {
var changed bool
mqSet := make(map[*kernel.MessageQueue]bool)
@@ -614,6 +623,10 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic
string, mqs []*kernel.M
if dc.cType == _PushConsume {
for mq := range mqSet {
+ _, exist := dc.processQueueTable.Load(mq)
+ if exist {
+ continue
+ }
if dc.consumeOrderly && !dc.lock(mq) {
rlog.Warnf("do defaultConsumer, Group:%s add a
new mq failed, %s, because lock failed",
dc.consumerGroup, mq.String())
@@ -669,13 +682,17 @@ func (dc *defaultConsumer) computePullFromWhere(mq
*kernel.MessageQueue) int64 {
case ConsumeFromLastOffset:
if lastOffset == -1 {
if strings.HasPrefix(mq.Topic,
kernel.RetryGroupTopicPrefix) {
- lastOffset, err :=
kernel.QueryMaxOffset(mq.Topic, mq.QueueId)
+ lastOffset = 0
+ } else {
+ lastOffset, err :=
kernel.QueryMaxOffset(mq)
if err == nil {
result = lastOffset
} else {
rlog.Warnf("query max offset
of: [%s:%d] error, %s", mq.Topic, mq.QueueId, err.Error())
}
}
+ } else {
+ result = -1
}
case ConsumeFromFirstOffset:
if lastOffset == -1 {
@@ -684,14 +701,25 @@ func (dc *defaultConsumer) computePullFromWhere(mq
*kernel.MessageQueue) int64 {
case ConsumeFromTimestamp:
if lastOffset == -1 {
if strings.HasPrefix(mq.Topic,
kernel.RetryGroupTopicPrefix) {
- lastOffset, err :=
kernel.QueryMaxOffset(mq.Topic, mq.QueueId)
+ lastOffset, err :=
kernel.QueryMaxOffset(mq)
if err == nil {
result = lastOffset
} else {
+ result = -1
rlog.Warnf("query max offset
of: [%s:%d] error, %s", mq.Topic, mq.QueueId, err.Error())
}
} else {
- // TODO parse timestamp
+ t, err := time.Parse("20060102150405",
dc.option.ConsumeTimestamp)
+ if err != nil {
+ result = -1
+ } else {
+ lastOffset, err :=
kernel.SearchOffsetByTimestamp(mq, t.Unix())
+ if err != nil {
+ result = -1
+ } else {
+ result = lastOffset
+ }
+ }
}
}
default:
diff --git a/examples/producer/main.go b/consumer/consumer_test.go
similarity index 53%
copy from examples/producer/main.go
copy to consumer/consumer_test.go
index 5c12584..2eb3f24 100644
--- a/examples/producer/main.go
+++ b/consumer/consumer_test.go
@@ -15,33 +15,17 @@ See the License for the specific language governing
permissions and
limitations under the License.
*/
-package main
+package consumer
import (
- "fmt"
- "github.com/apache/rocketmq-client-go/consumer"
- "github.com/apache/rocketmq-client-go/kernel"
- "os"
+ "github.com/stretchr/testify/assert"
+ "testing"
"time"
)
-func main() {
- c := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
- ConsumerModel: consumer.Clustering,
- FromWhere: consumer.ConsumeFromFirstOffset,
- })
- err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx
*consumer.ConsumeMessageContext,
- msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
- fmt.Println(msgs)
- return consumer.ConsumeSuccess, nil
- })
- if err != nil {
- fmt.Println(err.Error())
- }
- err = c.Start()
- if err != nil {
- fmt.Println(err.Error())
- os.Exit(-1)
- }
- time.Sleep(time.Hour)
+func TestParseTimestamp(t *testing.T) {
+ layout := "20060102150405"
+ timestamp, err := time.ParseInLocation(layout, "20190430193409",
time.Local)
+ assert.Nil(t, err)
+ assert.Equal(t, int64(1556624049), timestamp.Unix())
}
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index a11280d..6b6d719 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -17,7 +17,19 @@ limitations under the License.
package consumer
-import "github.com/apache/rocketmq-client-go/kernel"
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/apache/rocketmq-client-go/kernel"
+ "github.com/apache/rocketmq-client-go/remote"
+ "github.com/apache/rocketmq-client-go/rlog"
+ "github.com/apache/rocketmq-client-go/utils"
+ "os"
+ "path/filepath"
+ "strconv"
+ "sync"
+ "time"
+)
type readType int
@@ -27,6 +39,16 @@ const (
_ReadMemoryThenStore
)
+var (
+ _LocalOffsetStorePath = os.Getenv("rocketmq.client.localOffsetStoreDir")
+)
+
+func init() {
+ if _LocalOffsetStorePath == "" {
+ _LocalOffsetStorePath = filepath.Join(os.Getenv("user.home"),
".rocketmq_client_go")
+ }
+}
+
type OffsetStore interface {
load()
persist(mqs []*kernel.MessageQueue)
@@ -36,20 +58,280 @@ type OffsetStore interface {
}
type localFileOffsetStore struct {
+ group string
+ path string
+ OffsetTable map[string]map[int]*queueOffset `json:"OffsetTable"`
+ // mutex for offset file
+ mutex sync.Mutex
}
-func (local *localFileOffsetStore) load()
{}
-func (local *localFileOffsetStore) persist(mqs []*kernel.MessageQueue)
{}
-func (local *localFileOffsetStore) remove(mq *kernel.MessageQueue)
{}
-func (local *localFileOffsetStore) read(mq *kernel.MessageQueue, t readType)
int64 { return 0 }
-func (local *localFileOffsetStore) update(mq *kernel.MessageQueue, offset
int64, increaseOnly bool) {}
+type queueOffset struct {
+ QueueID int `json:"queueId"`
+ Broker string `json:"brokerName"`
+ Offset int64 `json:"offset"`
+}
+
+func NewLocalFileOffsetStore(clientID, group string) OffsetStore {
+ store := &localFileOffsetStore{
+ group: group,
+ path: filepath.Join(_LocalOffsetStorePath, clientID, group,
"offset.json"),
+ }
+ store.load()
+ return store
+}
+
+func (local *localFileOffsetStore) load() {
+ local.mutex.Lock()
+ defer local.mutex.Unlock()
+ data, err := utils.FileReadAll(local.path)
+ if err != nil {
+ data, err = utils.FileReadAll(filepath.Join(local.path, ".bak"))
+ }
+ if err != nil {
+ rlog.Debugf("load local offset: %s error: %s", local.path,
err.Error())
+ return
+ }
+ err = json.Unmarshal(data, local)
+ if err != nil {
+ rlog.Debugf("unmarshal local offset: %s error: %s", local.path,
err.Error())
+ return
+ }
+}
+
+func (local *localFileOffsetStore) read(mq *kernel.MessageQueue, t readType)
int64 {
+ if t == _ReadFromMemory || t == _ReadMemoryThenStore {
+ off := readFromMemory(local.OffsetTable, mq)
+ if off >= 0 || (off == -1 && t == _ReadFromMemory) {
+ return off
+ }
+ }
+ local.load()
+ return readFromMemory(local.OffsetTable, mq)
+}
+
+func (local *localFileOffsetStore) update(mq *kernel.MessageQueue, offset
int64, increaseOnly bool) {
+ rlog.Infof("update offset: %s to %d", mq, offset)
+ localOffset, exist := local.OffsetTable[mq.Topic]
+ if !exist {
+ localOffset = make(map[int]*queueOffset)
+ local.OffsetTable[mq.Topic] = localOffset
+ }
+ q, exist := localOffset[mq.QueueId]
+ if !exist {
+ q = &queueOffset{
+ QueueID: mq.QueueId,
+ Broker: mq.BrokerName,
+ }
+ localOffset[mq.QueueId] = q
+ }
+ if increaseOnly {
+ if q.Offset < offset {
+ q.Offset = offset
+ }
+ } else {
+ q.Offset = offset
+ }
+}
+
+func (local *localFileOffsetStore) persist(mqs []*kernel.MessageQueue) {
+ if len(mqs) == 0 {
+ return
+ }
+ s := new(struct {
+ OffsetTable map[string]map[int]*queueOffset `json:"offsetTable"`
+ })
+ table := make(map[string]map[int]*queueOffset)
+ for idx := range mqs {
+ mq := mqs[idx]
+ offsets, exist := local.OffsetTable[mq.Topic]
+ if !exist {
+ continue
+ }
+ off, exist := offsets[mq.QueueId]
+ if !exist {
+ continue
+ }
+
+ offsets, exist = table[mq.Topic]
+ if !exist {
+ offsets = make(map[int]*queueOffset)
+ }
+ offsets[off.QueueID] = off
+ }
+ data, _ := json.Marshal(s)
+ utils.CheckError(fmt.Sprintf("persist offset to %s", local.path),
utils.WriteToFile(local.path, data))
+}
+
+func (local *localFileOffsetStore) remove(mq *kernel.MessageQueue) {
+ // unsupported
+}
type remoteBrokerOffsetStore struct {
+ group string
+ OffsetTable map[string]map[int]*queueOffset `json:"OffsetTable"`
+ mutex sync.RWMutex
+}
+
+func NewRemoteOffsetStore(group string) OffsetStore {
+ return &remoteBrokerOffsetStore{
+ group: group,
+ OffsetTable: make(map[string]map[int]*queueOffset),
+ }
+}
+
+func (remote *remoteBrokerOffsetStore) load() {
+ // unsupported
+}
+
+func (remote *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue) {
+ remote.mutex.Lock()
+ defer remote.mutex.Unlock()
+ if len(mqs) == 0 {
+ return
+ }
+ for idx := range mqs {
+ mq := mqs[idx]
+ offsets, exist := remote.OffsetTable[mq.Topic]
+ if !exist {
+ continue
+ }
+ off, exist := offsets[mq.QueueId]
+ if !exist {
+ continue
+ }
+
+ err := updateConsumeOffsetToBroker(remote.group, mq.Topic, off)
+ if err != nil {
+ rlog.Warnf("update offset to broker error: %s, group:
%s, queue: %s, offset: %d",
+ err.Error(), remote.group, mq.String(),
off.Offset)
+ } else {
+ rlog.Infof("update offset to broker success, group: %s,
topic: %s, queue: %v", remote.group, mq.Topic, off)
+ }
+ }
+}
+
+func (remote *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue) {
+ remote.mutex.Lock()
+ defer remote.mutex.Unlock()
+ if mq == nil {
+ return
+ }
+ offset, exist := remote.OffsetTable[mq.Topic]
+ if !exist {
+ return
+ }
+ rlog.Infof("delete: %s", mq.String())
+ delete(offset, mq.QueueId)
+}
+
+func (remote *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t
readType) int64 {
+ remote.mutex.RLock()
+ if t == _ReadFromMemory || t == _ReadMemoryThenStore {
+ off := readFromMemory(remote.OffsetTable, mq)
+ if off >= 0 || (off == -1 && t == _ReadFromMemory) {
+ remote.mutex.RUnlock()
+ return off
+ }
+ }
+ off, err := fetchConsumeOffsetFromBroker(remote.group, mq)
+ if err != nil {
+ rlog.Errorf("fetch offset of %s error: %s", mq.String(),
err.Error())
+ remote.mutex.RUnlock()
+ return -1
+ }
+ remote.mutex.RUnlock()
+ remote.update(mq, off, true)
+ return off
}
-func (remote *remoteBrokerOffsetStore) load()
{}
-func (remote *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue)
{}
-func (remote *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue)
{}
-func (remote *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t
readType) int64 { return 0 }
func (remote *remoteBrokerOffsetStore) update(mq *kernel.MessageQueue, offset
int64, increaseOnly bool) {
+ rlog.Infof("update offset: %s to %d", mq, offset)
+ remote.mutex.Lock()
+ defer remote.mutex.Unlock()
+ localOffset, exist := remote.OffsetTable[mq.Topic]
+ if !exist {
+ localOffset = make(map[int]*queueOffset)
+ remote.OffsetTable[mq.Topic] = localOffset
+ }
+ q, exist := localOffset[mq.QueueId]
+ if !exist {
+ rlog.Infof("new queueOffset: %d, off: %d", mq.QueueId, offset)
+ q = &queueOffset{
+ QueueID: mq.QueueId,
+ Broker: mq.BrokerName,
+ }
+ localOffset[mq.QueueId] = q
+ }
+ if increaseOnly {
+ if q.Offset < offset {
+ q.Offset = offset
+ }
+ } else {
+ q.Offset = offset
+ }
+}
+
+func readFromMemory(table map[string]map[int]*queueOffset, mq
*kernel.MessageQueue) int64 {
+ localOffset, exist := table[mq.Topic]
+ if !exist {
+ return -1
+ }
+ off, exist := localOffset[mq.QueueId]
+ if !exist {
+ return -1
+ }
+
+ return off.Offset
+}
+
+func fetchConsumeOffsetFromBroker(group string, mq *kernel.MessageQueue)
(int64, error) {
+ broker := kernel.FindBrokerAddrByName(mq.BrokerName)
+ if broker == "" {
+ kernel.UpdateTopicRouteInfo(mq.Topic)
+ broker = kernel.FindBrokerAddrByName(mq.BrokerName)
+ }
+ if broker == "" {
+ return int64(-1), fmt.Errorf("broker: %s address not found",
mq.BrokerName)
+ }
+ queryOffsetRequest := &kernel.QueryConsumerOffsetRequest{
+ ConsumerGroup: group,
+ Topic: mq.Topic,
+ QueueId: mq.QueueId,
+ }
+ cmd := remote.NewRemotingCommand(kernel.ReqQueryConsumerOffset,
queryOffsetRequest, nil)
+ res, err := remote.InvokeSync(broker, cmd, 3*time.Second)
+ if err != nil {
+ return -1, err
+ }
+ if res.Code != kernel.ResSuccess {
+ return -2, fmt.Errorf("broker response code: %d, remarks: %s",
res.Code, res.Remark)
+ }
+
+ off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)
+
+ if err != nil {
+ return -1, err
+ }
+
+ return off, nil
+}
+
+func updateConsumeOffsetToBroker(group, topic string, queue *queueOffset)
error {
+ broker := kernel.FindBrokerAddrByName(queue.Broker)
+ if broker == "" {
+ kernel.UpdateTopicRouteInfo(topic)
+ broker = kernel.FindBrokerAddrByName(queue.Broker)
+ }
+ if broker == "" {
+ return fmt.Errorf("broker: %s address not found", queue.Broker)
+ }
+
+ updateOffsetRequest := &kernel.UpdateConsumerOffsetRequest{
+ ConsumerGroup: group,
+ Topic: topic,
+ QueueId: queue.QueueID,
+ CommitOffset: queue.Offset,
+ }
+ cmd := remote.NewRemotingCommand(kernel.ReqUpdateConsumerOffset,
updateOffsetRequest, nil)
+ return remote.InvokeOneWay(broker, cmd, 5*time.Second)
}
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 0db0766..f4367f9 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -64,20 +64,26 @@ func (pq *ProcessQueue) putMessage(messages
[]*kernel.MessageExt) {
localList := list.New()
for idx := range messages {
localList.PushBack(messages[idx])
+ pq.queueOffsetMax = messages[idx].QueueOffset
}
pq.mutex.Lock()
pq.msgCache.PushBackList(localList)
pq.mutex.Unlock()
}
-func (pq *ProcessQueue) removeMessage(number int) int {
- i := 0
+func (pq *ProcessQueue) removeMessage(number int) int64 {
+ result := pq.queueOffsetMax + 1
pq.mutex.Lock()
- for ; i < number && pq.msgCache.Len() > 0; i++ {
- pq.msgCache.Remove(pq.msgCache.Front())
+ for i := 0; i < number && pq.msgCache.Len() > 0; i++ {
+ head := pq.msgCache.Front()
+ pq.msgCache.Remove(head)
+ result = head.Value.(*kernel.MessageExt).QueueOffset
}
pq.mutex.Unlock()
- return i
+ if pq.msgCache.Len() > 0 {
+ result =
pq.msgCache.Front().Value.(*kernel.MessageExt).QueueOffset
+ }
+ return result
}
func (pq *ProcessQueue) takeMessages(number int) []*kernel.MessageExt {
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 58f4c68..fc78e3e 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -20,7 +20,6 @@ package consumer
import (
"context"
"errors"
- "fmt"
"github.com/apache/rocketmq-client-go/kernel"
"github.com/apache/rocketmq-client-go/rlog"
"math"
@@ -53,13 +52,6 @@ type PushConsumer interface {
type pushConsumer struct {
*defaultConsumer
- /**
- * Backtracking consumption time with second precision. Time format is
- * 20131223171201<br>
- * Implying Seventeen twelve and 01 seconds on December 23, 2013
year<br>
- * Default backtracking consumption time Half an hour ago.
- */
- ConsumeTimestamp time.Duration
queueFlowControlTimes int
queueMaxSpanFlowControlTimes int
consume func(*ConsumeMessageContext,
[]*kernel.MessageExt) (ConsumeResult, error)
@@ -97,9 +89,8 @@ func NewPushConsumer(consumerGroup string, opt
ConsumerOption) PushConsumer {
}
p := &pushConsumer{
- defaultConsumer: dc,
- ConsumeTimestamp: 30 * time.Minute,
- subscribedTopic: make(map[string]string, 0),
+ defaultConsumer: dc,
+ subscribedTopic: make(map[string]string, 0),
}
dc.mqChanged = p.messageQueueChanged
if p.consumeOrderly {
@@ -118,8 +109,8 @@ func (pc *pushConsumer) Start() error {
pc.state = kernel.StateStartFailed
pc.validate()
- // set retry topic
if pc.model == Clustering {
+ // set retry topic
retryTopic := kernel.GetRetryTopic(pc.consumerGroup)
pc.subscriptionDataTable.Store(retryTopic,
buildSubscriptionData(retryTopic,
MessageSelector{TAG, _SubAll}))
@@ -128,9 +119,9 @@ func (pc *pushConsumer) Start() error {
pc.client =
kernel.GetOrNewRocketMQClient(pc.option.ClientOption)
if pc.model == Clustering {
pc.option.ChangeInstanceNameToPID()
- pc.storage = &remoteBrokerOffsetStore{}
+ pc.storage = NewRemoteOffsetStore(pc.consumerGroup)
} else {
- pc.storage = &localFileOffsetStore{}
+ pc.storage = NewLocalFileOffsetStore(pc.consumerGroup,
pc.client.ClientID())
}
pc.storage.load()
go func() {
@@ -139,7 +130,6 @@ func (pc *pushConsumer) Start() error {
for {
pr := <-pc.prCh
go func() {
- fmt.Println(pr.String())
pc.pullMessage(&pr)
}()
}
@@ -417,7 +407,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
QueueOffset: request.nextOffset,
MaxMsgNums: pc.option.PullBatchSize,
SysFlag: sysFlag,
- CommitOffset: 0,
+ CommitOffset: commitOffsetValue,
SubExpression: _SubAll,
ExpressionType: string(TAG), // TODO
}
@@ -428,7 +418,6 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
// pullRequest.SubVersion = data.SubVersion
//}
- //ch := make(chan *kernel.PullResult)
brokerResult := tryFindBroker(request.mq)
if brokerResult == nil {
rlog.Warnf("no broker found for %s",
request.mq.String())
@@ -437,13 +426,13 @@ func (pc *pushConsumer) pullMessage(request *PullRequest)
{
}
result, err := pc.client.PullMessage(context.Background(),
brokerResult.BrokerAddr, pullRequest)
if err != nil {
- rlog.Warnf("pull message from %s error: %s",
"127.0.0.1:10911", err.Error())
+ rlog.Warnf("pull message from %s error: %s",
brokerResult.BrokerAddr, err.Error())
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
if result.Status == kernel.PullBrokerTimeout {
- rlog.Warnf("pull broker: %s timeout", "127.0.0.1:10911")
+ rlog.Warnf("pull broker: %s timeout",
brokerResult.BrokerAddr)
sleepTime = _PullDelayTimeWhenError
goto NEXT
}
@@ -486,7 +475,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
rlog.Warnf("fix the pull request offset: %s",
request.String())
}()
default:
- rlog.Warnf("")
+ rlog.Warnf("unknown pull status: %v", result.Status)
sleepTime = _PullDelayTimeWhenError
}
}
diff --git a/examples/producer/main.go b/examples/producer/main.go
index 5c12584..e2d0126 100644
--- a/examples/producer/main.go
+++ b/examples/producer/main.go
@@ -30,7 +30,7 @@ func main() {
ConsumerModel: consumer.Clustering,
FromWhere: consumer.ConsumeFromFirstOffset,
})
- err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx
*consumer.ConsumeMessageContext,
+ err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx
*consumer.ConsumeMessageContext,
msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
fmt.Println(msgs)
return consumer.ConsumeSuccess, nil
diff --git a/kernel/client.go b/kernel/client.go
index e83d598..8403ccb 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -101,63 +101,66 @@ type RMQClient struct {
// group -> InnerConsumer
consumerMap sync.Map
+ once sync.Once
}
var clientMap sync.Map
func GetOrNewRocketMQClient(option ClientOption) *RMQClient {
- // TODO
- return &RMQClient{option: option}
+ client := &RMQClient{option: option}
+ actual, _ := clientMap.LoadOrStore(client.ClientID(), client)
+ return actual.(*RMQClient)
}
func (c *RMQClient) Start() {
- // TODO fetchNameServerAddr
- go func() {}()
-
- // schedule update route info
- go func() {
- // delay
- time.Sleep(50 * time.Millisecond)
- for {
- c.UpdateTopicRouteInfo()
- time.Sleep(_PullNameServerInterval)
- }
- }()
-
- // TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
- go func() {}()
-
- // schedule persist offset
- go func() {
- time.Sleep(10 * time.Second)
- for {
- c.consumerMap.Range(func(key, value interface{}) bool {
- consumer := value.(InnerConsumer)
- consumer.PersistConsumerOffset()
- return true
- })
- time.Sleep(_PersistOffset)
- }
- }()
+ c.once.Do(func() {
+ // TODO fetchNameServerAddr
+ go func() {}()
+
+ // schedule update route info
+ go func() {
+ // delay
+ time.Sleep(50 * time.Millisecond)
+ for {
+ c.UpdateTopicRouteInfo()
+ time.Sleep(_PullNameServerInterval)
+ }
+ }()
+
+ // TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
+ go func() {}()
+
+ // schedule persist offset
+ go func() {
+ time.Sleep(10 * time.Second)
+ for {
+ c.consumerMap.Range(func(key, value
interface{}) bool {
+ consumer := value.(InnerConsumer)
+ consumer.PersistConsumerOffset()
+ return true
+ })
+ time.Sleep(_PersistOffset)
+ }
+ }()
- go func() {
- for {
- c.RebalanceImmediately()
- time.Sleep(time.Second)
- }
- }()
+ go func() {
+ for {
+ c.RebalanceImmediately()
+ time.Sleep(time.Second)
+ }
+ }()
+ })
}
func (c *RMQClient) ClientID() string {
- //id := c.option.ClientIP + "@" + c.option.InstanceName
- //if c.option.UnitName != "" {
- // id += "@" + c.option.UnitName
- //}
- return "127.0.0.1:10911@DEFAULT"
+ id := c.option.ClientIP + "@" + c.option.InstanceName
+ if c.option.UnitName != "" {
+ id += "@" + c.option.UnitName
+ }
+ return id
}
func (c *RMQClient) CheckClientInBroker() {
-
}
func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
@@ -269,7 +272,7 @@ func (c *RMQClient) SendMessageAsync(ctx context.Context,
brokerAddrs, brokerNam
func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string,
request *SendMessageRequest,
msgs []*Message) (*SendResult, error) {
cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request,
encodeMessages(msgs))
- err := remote.InvokeOneWay(brokerAddrs, cmd)
+ err := remote.InvokeOneWay(brokerAddrs, cmd, 3*time.Second)
if err != nil {
rlog.Warnf("send messages with oneway error: %v", err)
}
@@ -379,18 +382,59 @@ func (c *RMQClient) PullMessageAsync(ctx context.Context,
brokerAddrs string, re
}
// QueryMaxOffset with specific queueId and topic
-func QueryMaxOffset(topic string, queueId int) (int64, error) {
- return 0, nil
+func QueryMaxOffset(mq *MessageQueue) (int64, error) {
+ brokerAddr := FindBrokerAddrByName(mq.BrokerName)
+ if brokerAddr == "" {
+ UpdateTopicRouteInfo(mq.Topic)
+ brokerAddr = FindBrokerAddrByName(mq.Topic)
+ }
+ if brokerAddr == "" {
+ return -1, fmt.Errorf("the broker [%s] does not exist",
mq.BrokerName)
+ }
+
+ request := &GetMaxOffsetRequest{
+ Topic: mq.Topic,
+ QueueId: mq.QueueId,
+ }
+
+ cmd := remote.NewRemotingCommand(ReqGetMaxOffset, request, nil)
+ response, err := remote.InvokeSync(brokerAddr, cmd, 3*time.Second)
+ if err != nil {
+ return -1, err
+ }
+
+ return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
}
// QueryConsumerOffset with specific queueId and topic of consumerGroup
-func (c *RMQClient) QueryConsumerOffset(consumerGroup, topic string, queue
int) (int64, error) {
+func (c *RMQClient) QueryConsumerOffset(consumerGroup, mq *MessageQueue)
(int64, error) {
return 0, nil
}
// SearchOffsetByTimestamp with specific queueId and topic
-func (c *RMQClient) SearchOffsetByTimestamp(topic string, queue int, timestamp
int64) (int64, error) {
- return 0, nil
+func SearchOffsetByTimestamp(mq *MessageQueue, timestamp int64) (int64, error)
{
+ brokerAddr := FindBrokerAddrByName(mq.BrokerName)
+ if brokerAddr == "" {
+ UpdateTopicRouteInfo(mq.Topic)
+ brokerAddr = FindBrokerAddrByName(mq.Topic)
+ }
+ if brokerAddr == "" {
+ return -1, fmt.Errorf("the broker [%s] does not exist",
mq.BrokerName)
+ }
+
+ request := &SearchOffsetRequest{
+ Topic: mq.Topic,
+ QueueId: mq.QueueId,
+ Timestamp: timestamp,
+ }
+
+ cmd := remote.NewRemotingCommand(ReqSearchOffsetByTimestamp, request,
nil)
+ response, err := remote.InvokeSync(brokerAddr, cmd, 3*time.Second)
+ if err != nil {
+ return -1, err
+ }
+
+ return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
}
// UpdateConsumerOffset with specific queueId and topic
diff --git a/kernel/request.go b/kernel/request.go
index 2647ad5..36770ec 100644
--- a/kernel/request.go
+++ b/kernel/request.go
@@ -19,17 +19,22 @@ package kernel
import (
"fmt"
+ "strconv"
"time"
)
const (
- ReqPullMessage = int16(11)
- ReqHeartBeat = int16(34)
- ReqGetConsumerListByGroup = int16(38)
- ReqLockBatchMQ = int16(41)
- ReqUnlockBatchMQ = int16(42)
- ReqGetRouteInfoByTopic = int16(105)
- ReqSendBatchMessage = int16(320)
+ ReqPullMessage = int16(11)
+ ReqQueryConsumerOffset = int16(14)
+ ReqUpdateConsumerOffset = int16(15)
+ ReqSearchOffsetByTimestamp = int16(30)
+ ReqGetMaxOffset = int16(30)
+ ReqHeartBeat = int16(34)
+ ReqGetConsumerListByGroup = int16(38)
+ ReqLockBatchMQ = int16(41)
+ ReqUnlockBatchMQ = int16(42)
+ ReqGetRouteInfoByTopic = int16(105)
+ ReqSendBatchMessage = int16(320)
)
type SendMessageRequest struct {
@@ -97,28 +102,60 @@ func (request *GetConsumerList) Encode() map[string]string
{
type GetMaxOffsetRequest struct {
Topic string `json:"topic"`
- QueueId int32 `json:"queueId"`
+ QueueId int `json:"queueId"`
+}
+
+func (request *GetMaxOffsetRequest) Encode() map[string]string {
+ maps := make(map[string]string)
+ maps["topic"] = request.Topic
+ maps["queueId"] = strconv.Itoa(request.QueueId)
+ return maps
}
type QueryConsumerOffsetRequest struct {
ConsumerGroup string `json:"consumerGroup"`
Topic string `json:"topic"`
- QueueId int32 `json:"queueId"`
+ QueueId int `json:"queueId"`
+}
+
+func (request *QueryConsumerOffsetRequest) Encode() map[string]string {
+ maps := make(map[string]string)
+ maps["consumerGroup"] = request.ConsumerGroup
+ maps["topic"] = request.Topic
+ maps["queueId"] = strconv.Itoa(request.QueueId)
+ return maps
}
type SearchOffsetRequest struct {
Topic string `json:"topic"`
- QueueId int32 `json:"queueId"`
+ QueueId int `json:"queueId"`
Timestamp int64 `json:"timestamp"`
}
+func (request *SearchOffsetRequest) Encode() map[string]string {
+ maps := make(map[string]string)
+ maps["Topic"] = request.Topic
+ maps["QueueId"] = strconv.Itoa(request.QueueId)
+ maps["timestamp"] = strconv.FormatInt(request.Timestamp, 10)
+ return maps
+}
+
type UpdateConsumerOffsetRequest struct {
ConsumerGroup string `json:"consumerGroup"`
Topic string `json:"topic"`
- QueueId int32 `json:"queueId"`
+ QueueId int `json:"queueId"`
CommitOffset int64 `json:"commitOffset"`
}
+func (request *UpdateConsumerOffsetRequest) Encode() map[string]string {
+ maps := make(map[string]string)
+ maps["consumerGroup"] = request.ConsumerGroup
+ maps["topic"] = request.Topic
+ maps["queueId"] = strconv.Itoa(request.QueueId)
+ maps["commitOffset"] = strconv.FormatInt(request.CommitOffset, 10)
+ return maps
+}
+
type GetRouteInfoRequest struct {
Topic string `json:"topic"`
}
diff --git a/kernel/route.go b/kernel/route.go
index f8cd884..4ff4f99 100644
--- a/kernel/route.go
+++ b/kernel/route.go
@@ -138,7 +138,7 @@ func FindBrokerAddrByTopic(topic string) string {
return addr
}
-func FindBrokerAddressInPublish(brokerName string) string {
+func FindBrokerAddrByName(brokerName string) string {
bd, exist := brokerAddressesMap.Load(brokerName)
if !exist {
diff --git a/remote/remote_client.go b/remote/remote_client.go
index 9f5839a..bfe483c 100644
--- a/remote/remote_client.go
+++ b/remote/remote_client.go
@@ -116,7 +116,7 @@ func InvokeAsync(addr string, request *RemotingCommand,
timeoutMillis time.Durat
}
-func InvokeOneWay(addr string, request *RemotingCommand) error {
+func InvokeOneWay(addr string, request *RemotingCommand, timeout
time.Duration) error {
conn, err := connect(addr)
if err != nil {
return err
diff --git a/remote/remote_client_test.go b/remote/remote_client_test.go
index 0a05953..acd0d26 100644
--- a/remote/remote_client_test.go
+++ b/remote/remote_client_test.go
@@ -261,7 +261,7 @@ func TestInvokeOneWay(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
- err := InvokeOneWay(":3000", clientSendRemtingCommand)
+ err := InvokeOneWay(":3000", clientSendRemtingCommand,
3*time.Second)
if err != nil {
t.Fatalf("failed to invoke synchronous. %s", err)
}
diff --git a/examples/producer/main.go b/utils/errors.go
similarity index 53%
copy from examples/producer/main.go
copy to utils/errors.go
index 5c12584..0d96d97 100644
--- a/examples/producer/main.go
+++ b/utils/errors.go
@@ -15,33 +15,12 @@ See the License for the specific language governing
permissions and
limitations under the License.
*/
-package main
+package utils
-import (
- "fmt"
- "github.com/apache/rocketmq-client-go/consumer"
- "github.com/apache/rocketmq-client-go/kernel"
- "os"
- "time"
-)
+import "github.com/apache/rocketmq-client-go/rlog"
-func main() {
- c := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
- ConsumerModel: consumer.Clustering,
- FromWhere: consumer.ConsumeFromFirstOffset,
- })
- err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx
*consumer.ConsumeMessageContext,
- msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
- fmt.Println(msgs)
- return consumer.ConsumeSuccess, nil
- })
+func CheckError(action string, err error) {
if err != nil {
- fmt.Println(err.Error())
+ rlog.Errorf("%s error: %s", action, err.Error())
}
- err = c.Start()
- if err != nil {
- fmt.Println(err.Error())
- os.Exit(-1)
- }
- time.Sleep(time.Hour)
}
diff --git a/utils/files.go b/utils/files.go
new file mode 100644
index 0000000..b1c2c36
--- /dev/null
+++ b/utils/files.go
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+)
+
+func FileReadAll(path string) ([]byte, error) {
+ stat, err := os.Stat(path)
+ if err != nil {
+ return nil, err
+ }
+ file, err := os.Open(path)
+ if err != nil {
+ return nil, err
+ }
+ data := make([]byte, stat.Size())
+ _, err = file.Read(data)
+ if err != nil {
+ return nil, err
+ }
+ return data, nil
+}
+
+func WriteToFile(path string, data []byte) error {
+ tmpFile, err := os.Create(filepath.Join(path, ".tmp"))
+ if err != nil {
+ return err
+ }
+ _, err = tmpFile.Write(data)
+ if err != nil {
+ return err
+ }
+ CheckError(fmt.Sprintf("close %s", tmpFile.Name()), tmpFile.Close())
+
+ prevContent, err := FileReadAll(path)
+ if err == nil {
+ bakFile, err := os.Create(filepath.Join(path, ".bak"))
+ _, err = bakFile.Write(prevContent)
+ if err != nil {
+ return err
+ }
+ CheckError(fmt.Sprintf("close %s", bakFile.Name()),
bakFile.Close())
+ }
+ CheckError(fmt.Sprintf("remove %s", path), os.Remove(path))
+ return os.Rename(filepath.Join(path, ".tmp"), path)
+}