This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new cc1dfb2 Add Push Consumer (#47)
cc1dfb2 is described below
commit cc1dfb23aab9d8f18b21b587b0877ac2542b866b
Author: wenfeng <[email protected]>
AuthorDate: Fri Apr 26 14:00:48 2019 +0800
Add Push Consumer (#47)
* add push consumer
* add strategy
* push consumer done
* test push consumer
* fix push_consumer fatal error
* add license
---
config.go | 1 +
consumer.go | 302 ---------
consumer/consumer.go | 830 +++++++++++++++++++++++
consumer/offset_store.go | 55 ++
consumer/process_queue.go | 100 +++
consumer/pull_consumer.go | 198 ++++++
consumer/pull_consumer_test.go | 18 +
consumer/push_consumer.go | 606 +++++++++++++++++
consumer/push_consumer_test.go | 18 +
consumer/statistics.go | 60 ++
consumer/strategy.go | 130 ++++
consumer_test.go | 18 -
examples/main.go | 73 --
examples/producer.go | 50 --
examples/producer/main.go | 47 ++
examples/producer_orderly.go | 75 --
examples/pull_consumer.go | 74 --
examples/push_consumer.go | 59 --
go.mod | 1 +
go.sum | 2 +
kernel/client.go | 347 ++++++++--
kernel/client_test.go | 45 ++
kernel/constants.go | 27 +
kernel/message.go | 54 +-
kernel/model.go | 129 ++--
kernel/perm.go | 30 +-
kernel/request.go | 49 +-
kernel/route.go | 148 ++--
kernel/validators.go | 46 ++
remote/codes.go | 147 ----
remote/{client.go => remote_client.go} | 33 +-
remote/{client_test.go => remote_client_test.go} | 0
rlog/log.go | 10 -
utils/helper.go | 13 +-
utils/helper_test.go | 4 +-
utils/math.go | 32 +
utils/string.go | 40 ++
37 files changed, 2773 insertions(+), 1098 deletions(-)
diff --git a/config.go b/config.go
new file mode 100644
index 0000000..5519947
--- /dev/null
+++ b/config.go
@@ -0,0 +1 @@
+package rocketmq
diff --git a/consumer.go b/consumer.go
deleted file mode 100644
index 5119f93..0000000
--- a/consumer.go
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package rocketmq
-
-import (
- "context"
- "errors"
- "fmt"
- "github.com/apache/rocketmq-client-go/kernel"
- "github.com/apache/rocketmq-client-go/rlog"
- "strconv"
- "sync"
- "sync/atomic"
- "time"
-)
-
-type Consumer interface {
- Start()
- Pull(topic, expression string, numbers int) (*kernel.PullResult, error)
- SubscribeWithChan(topic, expression string) (chan *kernel.Message,
error)
- SubscribeWithFunc(topic, expression string, f func(msg *kernel.Message)
ConsumeResult) error
- ACK(msg *kernel.Message, result ConsumeResult)
-}
-
-var (
- queueCounterTable sync.Map
-)
-
-type ConsumeResult int
-
-type ConsumerType int
-
-const (
- Original ConsumerType = iota
- Orderly
- Transaction
-
- SubAll = "*"
-)
-
-type ConsumerConfig struct {
- GroupName string
- Model kernel.MessageModel
- UnitMode bool
- MaxReconsumeTimes int
- PullMessageTimeout time.Duration
- FromWhere kernel.ConsumeFromWhere
- brokerSuspendMaxTimeMillis int64
-}
-
-func NewConsumer(config ConsumerConfig) Consumer {
- return &defaultConsumer{
- config: config,
- }
-}
-
-type defaultConsumer struct {
- state kernel.ServiceState
- config ConsumerConfig
-}
-
-func (c *defaultConsumer) Start() {
- c.state = kernel.Running
-}
-
-func (c *defaultConsumer) Pull(topic, expression string, numbers int)
(*kernel.PullResult, error) {
- mq := getNextQueueOf(topic)
- if mq == nil {
- return nil, fmt.Errorf("prepard to pull topic: %s, but no queue
is founded", topic)
- }
-
- data := getSubscriptionData(mq, expression)
- result, err := c.pull(context.Background(), mq, data,
c.nextOffsetOf(mq), numbers)
-
- if err != nil {
- return nil, err
- }
-
- processPullResult(mq, result, data)
- return result, nil
-}
-
-// SubscribeWithChan ack manually
-func (c *defaultConsumer) SubscribeWithChan(topic, expression string) (chan
*kernel.Message, error) {
- return nil, nil
-}
-
-// SubscribeWithFunc ack automatic
-func (c *defaultConsumer) SubscribeWithFunc(topic, expression string,
- f func(msg *kernel.Message) ConsumeResult) error {
- return nil
-}
-
-func (c *defaultConsumer) ACK(msg *kernel.Message, result ConsumeResult) {
-
-}
-
-func (c *defaultConsumer) pull(ctx context.Context, mq *kernel.MessageQueue,
data *kernel.SubscriptionData,
- offset int64, numbers int) (*kernel.PullResult, error) {
- err := c.makeSureStateOK()
- if err != nil {
- return nil, err
- }
-
- if mq == nil {
- return nil, errors.New("MessageQueue is nil")
- }
-
- if offset < 0 {
- return nil, errors.New("offset < 0")
- }
-
- if numbers <= 0 {
- numbers = 1
- }
- c.subscriptionAutomatically(mq.Topic)
-
- brokerResult := tryFindBroker(mq)
- if brokerResult == nil {
- return nil, fmt.Errorf("the broker %s does not exist",
mq.BrokerName)
- }
-
- if (data.ExpType == kernel.TAG) && brokerResult.BrokerVersion <
kernel.V4_1_0 {
- return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to
support for filter message by %v",
- mq.BrokerName, brokerResult.BrokerVersion, data.ExpType)
- }
-
- sysFlag := buildSysFlag(false, true, true, false)
-
- if brokerResult.Slave {
- sysFlag = clearCommitOffsetFlag(sysFlag)
- }
- pullRequest := &kernel.PullMessageRequest{
- ConsumerGroup: c.config.GroupName,
- Topic: mq.Topic,
- QueueId: int32(mq.QueueId),
- QueueOffset: offset,
- MaxMsgNums: int32(numbers),
- SysFlag: sysFlag,
- CommitOffset: 0,
- SuspendTimeoutMillis: c.config.brokerSuspendMaxTimeMillis,
- SubExpression: data.SubString,
- ExpressionType: string(data.ExpType),
- }
-
- if data.ExpType == kernel.TAG {
- pullRequest.SubVersion = 0
- } else {
- pullRequest.SubVersion = data.SubVersion
- }
-
- // TODO computePullFromWhichFilterServer
- return kernel.PullMessage(ctx, brokerResult.BrokerAddr, pullRequest)
-}
-
-func (c *defaultConsumer) makeSureStateOK() error {
- if c.state != kernel.Running {
- return fmt.Errorf("the consumer state is [%d], not running",
c.state)
- }
- return nil
-}
-
-func (c *defaultConsumer) subscriptionAutomatically(topic string) {
- // TODO
-}
-
-func (c *defaultConsumer) nextOffsetOf(queue *kernel.MessageQueue) int64 {
- return 0
-}
-
-func toMessage(messageExts []*kernel.MessageExt) []*kernel.Message {
- msgs := make([]*kernel.Message, 0)
-
- return msgs
-}
-
-func processPullResult(mq *kernel.MessageQueue, result *kernel.PullResult,
data *kernel.SubscriptionData) {
- updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
- switch result.Status {
- case kernel.PullFound:
- msgs := result.GetMessageExts()
- msgListFilterAgain := msgs
- if len(data.Tags) > 0 && data.ClassFilterMode {
- msgListFilterAgain = make([]*kernel.MessageExt,
len(msgs))
- for _, msg := range msgs {
- _, exist := data.Tags[msg.GetTags()]
- if exist {
- msgListFilterAgain =
append(msgListFilterAgain, msg)
- }
- }
- }
-
- // TODO hook
-
- for _, msg := range msgListFilterAgain {
- traFlag, _ :=
strconv.ParseBool(msg.Properties[kernel.TransactionPrepared])
- if traFlag {
- msg.TransactionId =
msg.Properties[kernel.UniqueClientMessageIdKeyIndex]
- }
-
- msg.Properties[kernel.MinOffset] =
strconv.FormatInt(result.MinOffset, 10)
- msg.Properties[kernel.MaxOffset] =
strconv.FormatInt(result.MaxOffset, 10)
- }
-
- result.SetMessageExts(msgListFilterAgain)
- }
-}
-
-func getSubscriptionData(mq *kernel.MessageQueue, exp string)
*kernel.SubscriptionData {
- subData := &kernel.SubscriptionData{
- Topic: mq.Topic,
- }
- if exp == "" || exp == SubAll {
- subData.SubString = SubAll
- } else {
- // TODO
- }
- return subData
-}
-
-func getNextQueueOf(topic string) *kernel.MessageQueue {
- queues, err := kernel.FetchSubscribeMessageQueues(topic)
- if err != nil && len(queues) > 0 {
- rlog.Error(err.Error())
- return nil
- }
- var index int64
- v, exist := queueCounterTable.Load(topic)
- if !exist {
- index = -1
- queueCounterTable.Store(topic, 0)
- } else {
- index = v.(int64)
- }
-
- return queues[int(atomic.AddInt64(&index, 1))%len(queues)]
-}
-
-func buildSysFlag(commitOffset, suspend, subscription, classFilter bool) int32
{
- var flag int32 = 0
- if commitOffset {
- flag |= 0x1 << 0
- }
-
- if suspend {
- flag |= 0x1 << 1
- }
-
- if subscription {
- flag |= 0x1 << 2
- }
-
- if classFilter {
- flag |= 0x1 << 3
- }
-
- return flag
-}
-
-func clearCommitOffsetFlag(sysFlag int32) int32 {
- return sysFlag & (^0x1 << 0)
-}
-
-func tryFindBroker(mq *kernel.MessageQueue) *kernel.FindBrokerResult {
- result := kernel.FindBrokerAddressInSubscribe(mq.BrokerName,
recalculatePullFromWhichNode(mq), false)
-
- if result == nil {
- kernel.UpdateTopicRouteInfo(mq.Topic)
- }
- return kernel.FindBrokerAddressInSubscribe(mq.BrokerName,
recalculatePullFromWhichNode(mq), false)
-}
-
-var (
- pullFromWhichNodeTable sync.Map
-)
-
-func updatePullFromWhichNode(mq *kernel.MessageQueue, brokerId int64) {
- pullFromWhichNodeTable.Store(mq.HashCode(), brokerId)
-}
-
-func recalculatePullFromWhichNode(mq *kernel.MessageQueue) int64 {
- v, exist := pullFromWhichNodeTable.Load(mq.HashCode())
- if exist {
- return v.(int64)
- }
- return kernel.MasterId
-}
diff --git a/consumer/consumer.go b/consumer/consumer.go
new file mode 100644
index 0000000..60d3dd5
--- /dev/null
+++ b/consumer/consumer.go
@@ -0,0 +1,830 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/apache/rocketmq-client-go/kernel"
+ "github.com/apache/rocketmq-client-go/remote"
+ "github.com/apache/rocketmq-client-go/rlog"
+ "github.com/apache/rocketmq-client-go/utils"
+ "github.com/tidwall/gjson"
+ "sort"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+const (
+ // Delay some time when exception error
+ _PullDelayTimeWhenError = 3 * time.Second
+
+ // Flow control interval
+ _PullDelayTimeWhenFlowControl = 50 * time.Millisecond
+
+ // Delay some time when suspend pull service
+ _PullDelayTimeWhenSuspend = 30 * time.Second
+
+ // Long polling mode, the Consumer connection max suspend time
+ _BrokerSuspendMaxTime = 20 * time.Second
+
+ // Long polling mode, the Consumer connection timeout (must greater
than _BrokerSuspendMaxTime)
+ _ConsumerTimeoutWhenSuspend = 30 * time.Second
+
+ // Offset persistent interval for consumer
+ _PersistConsumerOffsetInterval = 5 * time.Second
+)
+
+// Message model defines the way how messages are delivered to each consumer
clients.
+// </p>
+//
+// RocketMQ supports two message models: clustering and broadcasting. If
clustering is set, consumer clients with
+// the same {@link #consumerGroup} would only consume shards of the messages
subscribed, which achieves load
+// balances; Conversely, if the broadcasting is set, each consumer client will
consume all subscribed messages
+// separately.
+// </p>
+//
+// This field defaults to clustering.
+type MessageModel int
+
+const (
+ BroadCasting MessageModel = iota
+ Clustering
+)
+
+func (mode MessageModel) String() string {
+ switch mode {
+ case BroadCasting:
+ return "BroadCasting"
+ case Clustering:
+ return "Clustering"
+ default:
+ return "Unknown"
+ }
+}
+
+// Consuming point on consumer booting.
+// </p>
+//
+// There are three consuming points:
+// <ul>
+// <li>
+// <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it
stopped previously.
+// If it were a newly booting up consumer client, according aging of the
consumer group, there are two
+// cases:
+// <ol>
+// <li>
+// if the consumer group is created so recently that the earliest message
being subscribed has yet
+// expired, which means the consumer group represents a lately launched
business, consuming will
+// start from the very beginning;
+// </li>
+// <li>
+// if the earliest message being subscribed has expired, consuming will start
from the latest
+// messages, meaning messages born prior to the booting timestamp would be
ignored.
+// </li>
+// </ol>
+// </li>
+// <li>
+// <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from
earliest messages available.
+// </li>
+// <li>
+// <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from
specified timestamp, which means
+// messages born prior to {@link #consumeTimestamp} will be ignored
+// </li>
+// </ul>
+type ConsumeFromWhere int
+
+const (
+ ConsumeFromLastOffset ConsumeFromWhere = iota
+ ConsumeFromFirstOffset
+ ConsumeFromTimestamp
+)
+
+type ConsumeType string
+
+const (
+ _PullConsume = ConsumeType("pull")
+ _PushConsume = ConsumeType("push")
+)
+
+type ExpressionType string
+
+const (
+ /**
+ * <ul>
+ * Keywords:
+ * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
+ * </ul>
+ * <p/>
+ * <ul>
+ * Data type:
+ * <li>Boolean, like: TRUE, FALSE</li>
+ * <li>String, like: 'abc'</li>
+ * <li>Decimal, like: 123</li>
+ * <li>Float number, like: 3.1415</li>
+ * </ul>
+ * <p/>
+ * <ul>
+ * Grammar:
+ * <li>{@code AND, OR}</li>
+ * <li>{@code >, >=, <, <=, =}</li>
+ * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
+ * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
+ * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this
operation only support String type.</li>
+ * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is
null, or not.</li>
+ * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true,
or false.</li>
+ * </ul>
+ * <p/>
+ * <p>
+ * Example:
+ * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
+ * </p>
+ */
+ SQL92 = ExpressionType("SQL92")
+
+ /**
+ * Only support or operation such as
+ * "tag1 || tag2 || tag3", <br>
+ * If null or * expression, meaning subscribe all.
+ */
+ TAG = ExpressionType("TAG")
+)
+
+func IsTagType(exp string) bool {
+ if exp == "" || exp == "TAG" {
+ return true
+ }
+ return false
+}
+
+const (
+ _SubAll = "*"
+)
+
+type PullRequest struct {
+ consumerGroup string
+ mq *kernel.MessageQueue
+ pq *ProcessQueue
+ nextOffset int64
+ lockedFirst bool
+}
+
+func (pr *PullRequest) String() string {
+ return fmt.Sprintf("[ConsumerGroup: %s, Topic: %s, MessageQueue: %d]",
+ pr.consumerGroup, pr.mq.Topic, pr.mq.QueueId)
+}
+
+type ConsumerOption struct {
+ kernel.ClientOption
+ // The socket timeout in milliseconds
+ ConsumerPullTimeout time.Duration
+
+ // Concurrently max span offset.it has no effect on sequential
consumption
+ ConsumeConcurrentlyMaxSpan int
+
+ // Flow control threshold on queue level, each message queue will cache
at most 1000 messages by default,
+ // Consider the {PullBatchSize}, the instantaneous value may exceed the
limit
+ PullThresholdForQueue int
+
+ // Limit the cached message size on queue level, each message queue
will cache at most 100 MiB messages by default,
+ // Consider the {@code pullBatchSize}, the instantaneous value may
exceed the limit
+ //
+ // The size of a message only measured by message body, so it's not
accurate
+ PullThresholdSizeForQueue int
+
+ // Flow control threshold on topic level, default value is -1(Unlimited)
+ //
+ // The value of {@code pullThresholdForQueue} will be overwrote and
calculated based on
+ // {@code pullThresholdForTopic} if it is't unlimited
+ //
+ // For example, if the value of pullThresholdForTopic is 1000 and 10
message queues are assigned to this consumer,
+ // then pullThresholdForQueue will be set to 100
+ PullThresholdForTopic int
+
+ // Limit the cached message size on topic level, default value is -1
MiB(Unlimited)
+ //
+ // The value of {@code pullThresholdSizeForQueue} will be overwrote and
calculated based on
+ // {@code pullThresholdSizeForTopic} if it is't unlimited
+ //
+ // For example, if the value of pullThresholdSizeForTopic is 1000 MiB
and 10 message queues are
+ // assigned to this consumer, then pullThresholdSizeForQueue will be
set to 100 MiB
+ PullThresholdSizeForTopic int
+
+ // Message pull Interval
+ PullInterval time.Duration
+
+ // Batch consumption size
+ ConsumeMessageBatchMaxSize int
+
+ // Batch pull size
+ PullBatchSize int32
+
+ // Whether update subscription relationship when every pull
+ PostSubscriptionWhenPull bool
+
+ // Max re-consume times. -1 means 16 times.
+ //
+ // If messages are re-consumed more than {@link #maxReconsumeTimes}
before success, it's be directed to a deletion
+ // queue waiting.
+ MaxReconsumeTimes int
+
+ // Suspending pulling time for cases requiring slow pulling like
flow-control scenario.
+ SuspendCurrentQueueTimeMillis time.Duration
+
+ // Maximum amount of time a message may block the consuming thread.
+ ConsumeTimeout time.Duration
+
+ ConsumerModel MessageModel
+ Strategy AllocateStrategy
+ ConsumeOrderly bool
+ FromWhere ConsumeFromWhere
+ // TODO traceDispatcher
+}
+
+// TODO hook
+type defaultConsumer struct {
+ /**
+ * Consumers of the same role is required to have exactly same
subscriptions and consumerGroup to correctly achieve
+ * load balance. It's required and needs to be globally unique.
+ * </p>
+ *
+ * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a>
for further discussion.
+ */
+ consumerGroup string
+ model MessageModel
+ allocate func(string, string, []*kernel.MessageQueue, []string)
[]*kernel.MessageQueue
+ unitMode bool
+ consumeOrderly bool
+ fromWhere ConsumeFromWhere
+
+ cType ConsumeType
+ client *kernel.RMQClient
+ mqChanged func(topic string, mqAll, mqDivided []*kernel.MessageQueue)
+ state kernel.ServiceState
+ pause bool
+ once sync.Once
+ option ConsumerOption
+ // key: int, hash(*kernel.MessageQueue)
+ // value: *processQueue
+ processQueueTable sync.Map
+
+ // key: topic(string)
+ // value: map[int]*kernel.MessageQueue
+ topicSubscribeInfoTable sync.Map
+
+ // key: topic
+ // value: *SubscriptionData
+ subscriptionDataTable sync.Map
+ storage OffsetStore
+ // chan for push consumer
+ prCh chan PullRequest
+}
+
+func (dc *defaultConsumer) persistConsumerOffset() {
+ err := dc.makeSureStateOK()
+ if err != nil {
+ rlog.Errorf("consumer state error: %s", err.Error())
+ return
+ }
+ mqs := make([]*kernel.MessageQueue, 0)
+ dc.processQueueTable.Range(func(key, value interface{}) bool {
+ mqs = append(mqs, key.(*kernel.MessageQueue))
+ return true
+ })
+ dc.storage.persist(mqs)
+}
+
+func (dc *defaultConsumer) updateTopicSubscribeInfo(topic string, mqs
[]*kernel.MessageQueue) {
+ _, exist := dc.subscriptionDataTable.Load(topic)
+ // does subscribe, if true, replace it
+ if exist {
+ mqSet := make(map[int]*kernel.MessageQueue, 0)
+ for idx := range mqs {
+ mq := mqs[idx]
+ mqSet[mq.HashCode()] = mq
+ }
+ dc.topicSubscribeInfoTable.Store(topic, mqs)
+ }
+}
+
+func (dc *defaultConsumer) isSubscribeTopicNeedUpdate(topic string) bool {
+ _, exist := dc.subscriptionDataTable.Load(topic)
+ if !exist {
+ return false
+ }
+ _, exist = dc.topicSubscribeInfoTable.Load(topic)
+ return !exist
+}
+
+func (dc *defaultConsumer) doBalance() {
+ dc.subscriptionDataTable.Range(func(key, value interface{}) bool {
+ topic := key.(string)
+ if strings.HasPrefix(topic, kernel.RetryGroupTopicPrefix) {
+ return true
+ }
+ v, exist := dc.topicSubscribeInfoTable.Load(topic)
+ if !exist {
+ rlog.Warnf("do balance of group: %s, but topic: %s does
not exist.", dc.consumerGroup, topic)
+ return true
+ }
+ mqs := v.([]*kernel.MessageQueue)
+ switch dc.model {
+ case BroadCasting:
+ changed := dc.updateProcessQueueTable(topic, mqs)
+ if changed {
+ dc.mqChanged(topic, mqs, mqs)
+ rlog.Infof("messageQueueChanged, Group: %s,
Topic: %s, MessageQueues: %v",
+ dc.consumerGroup, topic, mqs)
+ }
+ case Clustering:
+ cidAll := dc.findConsumerList(topic)
+ if cidAll == nil {
+ rlog.Warnf("do balance for Group: %s, Topic: %s
get consumer id list failed",
+ dc.consumerGroup, topic)
+ return true
+ }
+ mqAll := make([]*kernel.MessageQueue, len(mqs))
+ copy(mqAll, mqs)
+ sort.Strings(cidAll)
+ sort.SliceStable(mqAll, func(i, j int) bool {
+ v := strings.Compare(mqAll[i].Topic,
mqAll[j].Topic)
+ if v != 0 {
+ return v > 0
+ }
+
+ v = strings.Compare(mqAll[i].BrokerName,
mqAll[j].BrokerName)
+ if v != 0 {
+ return v > 0
+ }
+ return (mqAll[i].QueueId - mqAll[j].QueueId) > 0
+ })
+ allocateResult := dc.allocate(dc.consumerGroup,
dc.client.ClientID(), mqAll, cidAll)
+ changed := dc.updateProcessQueueTable(topic,
allocateResult)
+ if changed {
+ dc.mqChanged(topic, mqAll, allocateResult)
+ rlog.Infof("do balance result changed,
allocateMessageQueueStrategyName=%s, group=%s, "+
+ "topic=%s, clientId=%s, mqAllSize=%d,
cidAllSize=%d, rebalanceResultSize=%d, "+
+ "rebalanceResultSet=%v",
string(dc.option.Strategy), dc.consumerGroup, topic, dc.client.ClientID(),
len(mqAll),
+ len(cidAll), len(allocateResult),
allocateResult)
+
+ }
+ }
+ return true
+ })
+}
+
+func (dc *defaultConsumer) SubscriptionDataList() []*kernel.SubscriptionData {
+ result := make([]*kernel.SubscriptionData, 0)
+ dc.subscriptionDataTable.Range(func(key, value interface{}) bool {
+ result = append(result, value.(*kernel.SubscriptionData))
+ return true
+ })
+ return result
+}
+
+func (dc *defaultConsumer) makeSureStateOK() error {
+ // TODO log
+ return nil //dc.state == StateRunning
+}
+
+type lockBatchRequestBody struct {
+ ConsumerGroup string `json:"consumerGroup"`
+ ClientId string `json:"clientId"`
+ MQs []*kernel.MessageQueue `json:"mqSet"`
+}
+
+func (dc *defaultConsumer) lock(mq *kernel.MessageQueue) bool {
+ brokerResult := kernel.FindBrokerAddressInSubscribe(mq.BrokerName,
kernel.MasterId, true)
+
+ if brokerResult == nil {
+ return false
+ }
+
+ body := &lockBatchRequestBody{
+ ConsumerGroup: dc.consumerGroup,
+ ClientId: dc.client.ClientID(),
+ MQs: []*kernel.MessageQueue{mq},
+ }
+ lockedMQ := dc.doLock(brokerResult.BrokerAddr, body)
+ var lockOK bool
+ for idx := range lockedMQ {
+ _mq := lockedMQ[idx]
+ v, exist := dc.processQueueTable.Load(_mq)
+ if exist {
+ pq := v.(*ProcessQueue)
+ pq.locked = true
+ pq.lastConsumeTime = time.Now()
+ }
+ if _mq.Equals(mq) {
+ lockOK = true
+ }
+ }
+ rlog.Infof("the message queue lock %v, %s %s", lockOK,
dc.consumerGroup, mq.String())
+ return lockOK
+}
+
+func (dc *defaultConsumer) unlock(mq *kernel.MessageQueue, oneway bool) {
+ brokerResult := kernel.FindBrokerAddressInSubscribe(mq.BrokerName,
kernel.MasterId, true)
+
+ if brokerResult == nil {
+ return
+ }
+
+ body := &lockBatchRequestBody{
+ ConsumerGroup: dc.consumerGroup,
+ ClientId: dc.client.ClientID(),
+ MQs: []*kernel.MessageQueue{mq},
+ }
+ dc.doUnlock(brokerResult.BrokerAddr, body, oneway)
+ rlog.Warnf("unlock messageQueue. group:%s, clientId:%s, mq:%s",
+ dc.consumerGroup, dc.client.ClientID(), mq.String())
+}
+
+func (dc *defaultConsumer) lockAll(mq kernel.MessageQueue) {
+ mqMapSet := dc.buildProcessQueueTableByBrokerName()
+ for broker, mqs := range mqMapSet {
+ if len(mqs) == 0 {
+ continue
+ }
+ brokerResult := kernel.FindBrokerAddressInSubscribe(broker,
kernel.MasterId, true)
+ if brokerResult == nil {
+ continue
+ }
+ body := &lockBatchRequestBody{
+ ConsumerGroup: dc.consumerGroup,
+ ClientId: dc.client.ClientID(),
+ MQs: mqs,
+ }
+ lockedMQ := dc.doLock(brokerResult.BrokerAddr, body)
+ set := make(map[int]bool, 0)
+ for idx := range lockedMQ {
+ _mq := lockedMQ[idx]
+ v, exist := dc.processQueueTable.Load(_mq)
+ if exist {
+ pq := v.(*ProcessQueue)
+ pq.locked = true
+ pq.lastConsumeTime = time.Now()
+ }
+ set[_mq.HashCode()] = true
+ }
+ for idx := range mqs {
+ _mq := mqs[idx]
+ if !set[_mq.HashCode()] {
+ v, exist := dc.processQueueTable.Load(_mq)
+ if exist {
+ pq := v.(*ProcessQueue)
+ pq.locked = true
+ pq.lastLockTime = time.Now()
+ rlog.Warnf("the message queue: %s
locked Failed, Group: %s", mq.String(), dc.consumerGroup)
+ }
+ }
+ }
+ }
+}
+
+func (dc *defaultConsumer) unlockAll(oneway bool) {
+ mqMapSet := dc.buildProcessQueueTableByBrokerName()
+ for broker, mqs := range mqMapSet {
+ if len(mqs) == 0 {
+ continue
+ }
+ brokerResult := kernel.FindBrokerAddressInSubscribe(broker,
kernel.MasterId, true)
+ if brokerResult == nil {
+ continue
+ }
+ body := &lockBatchRequestBody{
+ ConsumerGroup: dc.consumerGroup,
+ ClientId: dc.client.ClientID(),
+ MQs: mqs,
+ }
+ dc.doUnlock(brokerResult.BrokerAddr, body, oneway)
+ for idx := range mqs {
+ _mq := mqs[idx]
+ v, exist := dc.processQueueTable.Load(_mq)
+ if exist {
+ v.(*ProcessQueue).locked = false
+ rlog.Warnf("the message queue: %s locked
Failed, Group: %s", _mq.String(), dc.consumerGroup)
+ }
+ }
+ }
+}
+
+func (dc *defaultConsumer) doLock(addr string, body *lockBatchRequestBody)
[]kernel.MessageQueue {
+ data, _ := json.Marshal(body)
+ request := remote.NewRemotingCommand(kernel.ReqLockBatchMQ, nil, data)
+ response, err := remote.InvokeSync(addr, request, 1*time.Second)
+ if err != nil {
+ rlog.Errorf("lock mq to broker: %s error %s", addr, err.Error())
+ return nil
+ }
+ lockOKMQSet := struct {
+ MQs []kernel.MessageQueue `json:"lockOKMQSet"`
+ }{}
+ err = json.Unmarshal(response.Body, &lockOKMQSet)
+ if err != nil {
+ rlog.Errorf("Unmarshal lock mq body error %s", err.Error())
+ return nil
+ }
+ return lockOKMQSet.MQs
+}
+
+func (dc *defaultConsumer) doUnlock(addr string, body *lockBatchRequestBody,
oneway bool) {
+ data, _ := json.Marshal(body)
+ request := remote.NewRemotingCommand(kernel.ReqUnlockBatchMQ, nil, data)
+ if oneway {
+ err := remote.InvokeOneWay(addr, request)
+ if err != nil {
+ rlog.Errorf("lock mq to broker with oneway: %s error
%s", addr, err.Error())
+ }
+ } else {
+ response, err := remote.InvokeSync(addr, request, 1*time.Second)
+ if err != nil {
+ rlog.Errorf("lock mq to broker: %s error %s", addr,
err.Error())
+ }
+ if response.Code != kernel.ResSuccess {
+ // TODO error
+ }
+ }
+}
+
+func (dc *defaultConsumer) buildProcessQueueTableByBrokerName()
map[string][]*kernel.MessageQueue {
+ result := make(map[string][]*kernel.MessageQueue, 0)
+
+ dc.processQueueTable.Range(func(key, value interface{}) bool {
+ mq := key.(*kernel.MessageQueue)
+ mqs, exist := result[mq.BrokerName]
+ if !exist {
+ mqs = make([]*kernel.MessageQueue, 0)
+ }
+ mqs = append(mqs, mq)
+ result[mq.BrokerName] = mqs
+ return true
+ })
+
+ return result
+}
+
+func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs
[]*kernel.MessageQueue) bool {
+ var changed bool
+ mqSet := make(map[*kernel.MessageQueue]bool)
+ for idx := range mqs {
+ mqSet[mqs[idx]] = true
+ }
+ // TODO
+ dc.processQueueTable.Range(func(key, value interface{}) bool {
+ mq := key.(*kernel.MessageQueue)
+ pq := value.(*ProcessQueue)
+ if mq.Topic == topic {
+ if !mqSet[mq] {
+ pq.dropped = true
+ if dc.removeUnnecessaryMessageQueue(mq, pq) {
+ delete(mqSet, mq)
+ changed = true
+ rlog.Infof("do defaultConsumer,
Group:%s, remove unnecessary mq: %s", dc.consumerGroup, mq.String())
+ }
+ } else if pq.isPullExpired() && dc.cType ==
_PushConsume {
+ pq.dropped = true
+ if dc.removeUnnecessaryMessageQueue(mq, pq) {
+ delete(mqSet, mq)
+ changed = true
+ rlog.Infof("do defaultConsumer,
Group:%s, remove unnecessary mq: %s, "+
+ "because pull was paused, so
try to fixed it", dc.consumerGroup, mq)
+ }
+ }
+ }
+ return true
+ })
+
+ if dc.cType == _PushConsume {
+ for mq := range mqSet {
+ if dc.consumeOrderly && !dc.lock(mq) {
+ rlog.Warnf("do defaultConsumer, Group:%s add a
new mq failed, %s, because lock failed",
+ dc.consumerGroup, mq.String())
+ continue
+ }
+ dc.storage.remove(mq)
+ nextOffset := dc.computePullFromWhere(mq)
+ if nextOffset >= 0 {
+ _, exist := dc.processQueueTable.Load(mq)
+ if exist {
+ rlog.Debugf("do defaultConsumer, Group:
%s, mq already exist, %s", dc.consumerGroup, mq.String())
+ } else {
+ rlog.Infof("do defaultConsumer, Group:
%s, add a new mq, %s", dc.consumerGroup, mq.String())
+ pq := &ProcessQueue{}
+ dc.processQueueTable.Store(mq, pq)
+ pr := PullRequest{
+ consumerGroup: dc.consumerGroup,
+ mq: mq,
+ pq: pq,
+ nextOffset: nextOffset,
+ }
+ dc.prCh <- pr
+ changed = true
+ }
+ } else {
+ rlog.Warnf("do defaultConsumer failed,
Group:%s, add new mq failed, {}", dc.consumerGroup, mq)
+ }
+ }
+ }
+
+ return changed
+}
+
+func (dc *defaultConsumer) removeUnnecessaryMessageQueue(mq
*kernel.MessageQueue, pq *ProcessQueue) bool {
+ dc.storage.persist([]*kernel.MessageQueue{mq})
+ dc.storage.remove(mq)
+ if dc.cType == _PushConsume && dc.consumeOrderly && Clustering ==
dc.model {
+ // TODO
+ }
+ return true
+}
+
+func (dc *defaultConsumer) computePullFromWhere(mq *kernel.MessageQueue) int64
{
+ if dc.cType == _PullConsume {
+ return 0
+ }
+ var result = int64(-1)
+ lastOffset := dc.storage.read(mq, _ReadFromStore)
+ if lastOffset >= 0 {
+ result = lastOffset
+ } else {
+ switch dc.fromWhere {
+ case ConsumeFromLastOffset:
+ if lastOffset == -1 {
+ if strings.HasPrefix(mq.Topic,
kernel.RetryGroupTopicPrefix) {
+ lastOffset, err :=
kernel.QueryMaxOffset(mq.Topic, mq.QueueId)
+ if err == nil {
+ result = lastOffset
+ } else {
+ rlog.Warnf("query max offset
of: [%s:%d] error, %s", mq.Topic, mq.QueueId, err.Error())
+ }
+ }
+ }
+ case ConsumeFromFirstOffset:
+ if lastOffset == -1 {
+ result = 0
+ }
+ case ConsumeFromTimestamp:
+ if lastOffset == -1 {
+ if strings.HasPrefix(mq.Topic,
kernel.RetryGroupTopicPrefix) {
+ lastOffset, err :=
kernel.QueryMaxOffset(mq.Topic, mq.QueueId)
+ if err == nil {
+ result = lastOffset
+ } else {
+ rlog.Warnf("query max offset
of: [%s:%d] error, %s", mq.Topic, mq.QueueId, err.Error())
+ }
+ } else {
+ // TODO parse timestamp
+ }
+ }
+ default:
+ }
+ }
+
+ return result
+}
+
+func (dc *defaultConsumer) findConsumerList(topic string) []string {
+ brokerAddr := kernel.FindBrokerAddrByTopic(topic)
+ if brokerAddr == "" {
+ kernel.UpdateTopicRouteInfo(topic)
+ brokerAddr = kernel.FindBrokerAddrByTopic(topic)
+ }
+
+ if brokerAddr != "" {
+ req := &kernel.GetConsumerList{
+ ConsumerGroup: dc.consumerGroup,
+ }
+ cmd :=
remote.NewRemotingCommand(kernel.ReqGetConsumerListByGroup, req, nil)
+ res, err := remote.InvokeSync(brokerAddr, cmd, 3*time.Second)
// TODO 超时机制有问题
+ if err != nil {
+ rlog.Errorf("get consumer list of [%s] from %s error:
%s", dc.consumerGroup, brokerAddr, err.Error())
+ return nil
+ }
+ result := gjson.ParseBytes(res.Body)
+ list := make([]string, 0)
+ arr := result.Get("consumerIdList").Array()
+ for idx := range arr {
+ list = append(list, arr[idx].String())
+ }
+ return list
+ }
+ return nil
+}
+
+func buildSubscriptionData(topic string, selector MessageSelector)
*kernel.SubscriptionData {
+ subData := &kernel.SubscriptionData{
+ Topic: topic,
+ SubString: selector.Expression,
+ ExpType: string(selector.Type),
+ }
+
+ if selector.Type != "" && selector.Type != TAG {
+ return subData
+ }
+
+ if selector.Expression == "" || selector.Expression == _SubAll {
+ subData.ExpType = string(TAG)
+ subData.SubString = _SubAll
+ } else {
+ tags := strings.Split(selector.Expression, "\\|\\|")
+ for idx := range tags {
+ trimString := strings.Trim(tags[idx], " ")
+ if trimString != "" {
+ if !subData.Tags[trimString] {
+ subData.Tags[trimString] = true
+ }
+ hCode := utils.HashString(trimString)
+ if !subData.Codes[int32(hCode)] {
+ subData.Codes[int32(hCode)] = true
+ }
+ }
+ }
+ }
+ return subData
+}
+
+func getNextQueueOf(topic string) *kernel.MessageQueue {
+ queues, err := kernel.FetchSubscribeMessageQueues(topic)
+ if err != nil && len(queues) > 0 {
+ rlog.Error(err.Error())
+ return nil
+ }
+ var index int64
+ v, exist := queueCounterTable.Load(topic)
+ if !exist {
+ index = -1
+ queueCounterTable.Store(topic, 0)
+ } else {
+ index = v.(int64)
+ }
+
+ return queues[int(atomic.AddInt64(&index, 1))%len(queues)]
+}
+
+func buildSysFlag(commitOffset, suspend, subscription, classFilter bool) int32
{
+ var flag int32 = 0
+ if commitOffset {
+ flag |= 0x1 << 0
+ }
+
+ if suspend {
+ flag |= 0x1 << 1
+ }
+
+ if subscription {
+ flag |= 0x1 << 2
+ }
+
+ if classFilter {
+ flag |= 0x1 << 3
+ }
+
+ return flag
+}
+
+func clearCommitOffsetFlag(sysFlag int32) int32 {
+ return sysFlag & (^0x1 << 0)
+}
+
+func tryFindBroker(mq *kernel.MessageQueue) *kernel.FindBrokerResult {
+ result := kernel.FindBrokerAddressInSubscribe(mq.BrokerName,
recalculatePullFromWhichNode(mq), false)
+
+ if result == nil {
+ kernel.UpdateTopicRouteInfo(mq.Topic)
+ }
+ return kernel.FindBrokerAddressInSubscribe(mq.BrokerName,
recalculatePullFromWhichNode(mq), false)
+}
+
+var (
+ pullFromWhichNodeTable sync.Map
+)
+
+func updatePullFromWhichNode(mq *kernel.MessageQueue, brokerId int64) {
+ pullFromWhichNodeTable.Store(mq.HashCode(), brokerId)
+}
+
+func recalculatePullFromWhichNode(mq *kernel.MessageQueue) int64 {
+ v, exist := pullFromWhichNodeTable.Load(mq.HashCode())
+ if exist {
+ return v.(int64)
+ }
+ return kernel.MasterId
+}
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
new file mode 100644
index 0000000..a11280d
--- /dev/null
+++ b/consumer/offset_store.go
@@ -0,0 +1,55 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import "github.com/apache/rocketmq-client-go/kernel"
+
+type readType int
+
+const (
+ _ReadFromMemory readType = iota
+ _ReadFromStore
+ _ReadMemoryThenStore
+)
+
+type OffsetStore interface {
+ load()
+ persist(mqs []*kernel.MessageQueue)
+ remove(mq *kernel.MessageQueue)
+ read(mq *kernel.MessageQueue, t readType) int64
+ update(mq *kernel.MessageQueue, offset int64, increaseOnly bool)
+}
+
+type localFileOffsetStore struct {
+}
+
+func (local *localFileOffsetStore) load()
{}
+func (local *localFileOffsetStore) persist(mqs []*kernel.MessageQueue)
{}
+func (local *localFileOffsetStore) remove(mq *kernel.MessageQueue)
{}
+func (local *localFileOffsetStore) read(mq *kernel.MessageQueue, t readType)
int64 { return 0 }
+func (local *localFileOffsetStore) update(mq *kernel.MessageQueue, offset
int64, increaseOnly bool) {}
+
+type remoteBrokerOffsetStore struct {
+}
+
+func (remote *remoteBrokerOffsetStore) load()
{}
+func (remote *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue)
{}
+func (remote *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue)
{}
+func (remote *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t
readType) int64 { return 0 }
+func (remote *remoteBrokerOffsetStore) update(mq *kernel.MessageQueue, offset
int64, increaseOnly bool) {
+}
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
new file mode 100644
index 0000000..0db0766
--- /dev/null
+++ b/consumer/process_queue.go
@@ -0,0 +1,100 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+ "container/list"
+ "github.com/apache/rocketmq-client-go/kernel"
+ "sync"
+ "time"
+)
+
+const (
+ _RebalanceLockMaxTime = 30 * time.Second
+ _RebalanceInterval = 20 * time.Second
+ _PullMaxIdleTime = 120 * time.Second
+)
+
+type ProcessQueue struct {
+ mutex sync.RWMutex
+ msgCache list.List // sorted
+ cachedMsgCount int
+ cachedMsgSize int64
+ consumeLock sync.Mutex
+ consumingMsgOrderlyTreeMap sync.Map
+ tryUnlockTimes int64
+ queueOffsetMax int64
+ dropped bool
+ lastPullTime time.Time
+ lastConsumeTime time.Time
+ locked bool
+ lastLockTime time.Time
+ consuming bool
+ msgAccCnt int64
+ once sync.Once
+}
+
+func (pq *ProcessQueue) isPullExpired() bool {
+ return false
+}
+
+func (pq *ProcessQueue) getMaxSpan() int {
+ return pq.msgCache.Len()
+}
+
+func (pq *ProcessQueue) putMessage(messages []*kernel.MessageExt) {
+ pq.once.Do(func() {
+ pq.msgCache.Init()
+ })
+ localList := list.New()
+ for idx := range messages {
+ localList.PushBack(messages[idx])
+ }
+ pq.mutex.Lock()
+ pq.msgCache.PushBackList(localList)
+ pq.mutex.Unlock()
+}
+
+func (pq *ProcessQueue) removeMessage(number int) int {
+ i := 0
+ pq.mutex.Lock()
+ for ; i < number && pq.msgCache.Len() > 0; i++ {
+ pq.msgCache.Remove(pq.msgCache.Front())
+ }
+ pq.mutex.Unlock()
+ return i
+}
+
+func (pq *ProcessQueue) takeMessages(number int) []*kernel.MessageExt {
+ for pq.msgCache.Len() == 0 {
+ time.Sleep(10 * time.Millisecond)
+ }
+ result := make([]*kernel.MessageExt, number)
+ i := 0
+ pq.mutex.Lock()
+ for ; i < number; i++ {
+ e := pq.msgCache.Front()
+ if e == nil {
+ break
+ }
+ result[i] = e.Value.(*kernel.MessageExt)
+ pq.msgCache.Remove(e)
+ }
+ pq.mutex.Unlock()
+ return result[:i]
+}
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
new file mode 100644
index 0000000..1b4f950
--- /dev/null
+++ b/consumer/pull_consumer.go
@@ -0,0 +1,198 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "github.com/apache/rocketmq-client-go/kernel"
+ "strconv"
+ "sync"
+)
+
+type MessageSelector struct {
+ Type ExpressionType
+ Expression string
+}
+
+type PullConsumer interface {
+ Start()
+ Shutdown()
+ Pull(ctx context.Context, topic string, selector MessageSelector,
numbers int) (*kernel.PullResult, error)
+}
+
+var (
+ queueCounterTable sync.Map
+)
+
+func NewConsumer(config ConsumerOption) *defaultPullConsumer {
+ return &defaultPullConsumer{
+ option: config,
+ }
+}
+
+type defaultPullConsumer struct {
+ state kernel.ServiceState
+ option ConsumerOption
+ client *kernel.RMQClient
+ GroupName string
+ Model MessageModel
+ UnitMode bool
+}
+
+func (c *defaultPullConsumer) Start() {
+ c.state = kernel.StateRunning
+}
+
+func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector
MessageSelector, numbers int) (*kernel.PullResult, error) {
+ mq := getNextQueueOf(topic)
+ if mq == nil {
+ return nil, fmt.Errorf("prepard to pull topic: %s, but no queue
is founded", topic)
+ }
+
+ data := buildSubscriptionData(mq.Topic, selector)
+ result, err := c.pull(context.Background(), mq, data,
c.nextOffsetOf(mq), numbers)
+
+ if err != nil {
+ return nil, err
+ }
+
+ processPullResult(mq, result, data)
+ return result, nil
+}
+
+// SubscribeWithChan ack manually
+func (c *defaultPullConsumer) SubscribeWithChan(topic, selector
MessageSelector) (chan *kernel.Message, error) {
+ return nil, nil
+}
+
+// SubscribeWithFunc ack automatic
+func (c *defaultPullConsumer) SubscribeWithFunc(topic, selector
MessageSelector,
+ f func(msg *kernel.Message) ConsumeResult) error {
+ return nil
+}
+
+func (c *defaultPullConsumer) ACK(msg *kernel.Message, result ConsumeResult) {
+
+}
+
+func (c *defaultPullConsumer) pull(ctx context.Context, mq
*kernel.MessageQueue, data *kernel.SubscriptionData,
+ offset int64, numbers int) (*kernel.PullResult, error) {
+ err := c.makeSureStateOK()
+ if err != nil {
+ return nil, err
+ }
+
+ if mq == nil {
+ return nil, errors.New("MessageQueue is nil")
+ }
+
+ if offset < 0 {
+ return nil, errors.New("offset < 0")
+ }
+
+ if numbers <= 0 {
+ numbers = 1
+ }
+ c.subscriptionAutomatically(mq.Topic)
+
+ brokerResult := tryFindBroker(mq)
+ if brokerResult == nil {
+ return nil, fmt.Errorf("the broker %s does not exist",
mq.BrokerName)
+ }
+
+ if (data.ExpType == string(TAG)) && brokerResult.BrokerVersion <
kernel.V4_1_0 {
+ return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to
support for filter message by %v",
+ mq.BrokerName, brokerResult.BrokerVersion, data.ExpType)
+ }
+
+ sysFlag := buildSysFlag(false, true, true, false)
+
+ if brokerResult.Slave {
+ sysFlag = clearCommitOffsetFlag(sysFlag)
+ }
+ pullRequest := &kernel.PullMessageRequest{
+ ConsumerGroup: c.GroupName,
+ Topic: mq.Topic,
+ QueueId: int32(mq.QueueId),
+ QueueOffset: offset,
+ MaxMsgNums: int32(numbers),
+ SysFlag: sysFlag,
+ CommitOffset: 0,
+ SuspendTimeoutMillis: _BrokerSuspendMaxTime,
+ SubExpression: data.SubString,
+ ExpressionType: string(data.ExpType),
+ }
+
+ if data.ExpType == string(TAG) {
+ pullRequest.SubVersion = 0
+ } else {
+ pullRequest.SubVersion = data.SubVersion
+ }
+
+ // TODO computePullFromWhichFilterServer
+ return c.client.PullMessage(ctx, brokerResult.BrokerAddr, pullRequest)
+}
+
+func (c *defaultPullConsumer) makeSureStateOK() error {
+ if c.state != kernel.StateRunning {
+ return fmt.Errorf("the consumer state is [%d], not running",
c.state)
+ }
+ return nil
+}
+
+func (c *defaultPullConsumer) subscriptionAutomatically(topic string) {
+ // TODO
+}
+
+func (c *defaultPullConsumer) nextOffsetOf(queue *kernel.MessageQueue) int64 {
+ return 0
+}
+
+func processPullResult(mq *kernel.MessageQueue, result *kernel.PullResult,
data *kernel.SubscriptionData) {
+ updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
+ switch result.Status {
+ case kernel.PullFound:
+ msgs := result.GetMessageExts()
+ msgListFilterAgain := msgs
+ if len(data.Tags) > 0 && data.ClassFilterMode {
+ msgListFilterAgain = make([]*kernel.MessageExt,
len(msgs))
+ for _, msg := range msgs {
+ _, exist := data.Tags[msg.GetTags()]
+ if exist {
+ msgListFilterAgain =
append(msgListFilterAgain, msg)
+ }
+ }
+ }
+
+ // TODO hook
+
+ for _, msg := range msgListFilterAgain {
+ traFlag, _ :=
strconv.ParseBool(msg.Properties[kernel.PropertyTransactionPrepared])
+ if traFlag {
+ msg.TransactionId =
msg.Properties[kernel.PropertyUniqueClientMessageIdKeyIndex]
+ }
+
+ msg.Properties[kernel.PropertyMinOffset] =
strconv.FormatInt(result.MinOffset, 10)
+ msg.Properties[kernel.PropertyMaxOffset] =
strconv.FormatInt(result.MaxOffset, 10)
+ }
+
+ result.SetMessageExts(msgListFilterAgain)
+ }
+}
diff --git a/consumer/pull_consumer_test.go b/consumer/pull_consumer_test.go
new file mode 100644
index 0000000..f7bc454
--- /dev/null
+++ b/consumer/pull_consumer_test.go
@@ -0,0 +1,18 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
new file mode 100644
index 0000000..58f4c68
--- /dev/null
+++ b/consumer/push_consumer.go
@@ -0,0 +1,606 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "github.com/apache/rocketmq-client-go/kernel"
+ "github.com/apache/rocketmq-client-go/rlog"
+ "math"
+ "strconv"
+ "time"
+)
+
+// In most scenarios, this is the mostly recommended usage to consume messages.
+//
+// Technically speaking, this push client is virtually a wrapper of the
underlying pull service. Specifically, on
+// arrival of messages pulled from brokers, it roughly invokes the registered
callback handler to feed the messages.
+//
+// See quick start/Consumer in the example module for a typical usage.
+//
+// <strong>Thread Safety:</strong> After initialization, the instance can be
regarded as thread-safe.
+type ConsumeResult int
+
+const (
+ Mb = 1024 * 1024
+ ConsumeSuccess ConsumeResult = iota
+ ConsumeRetryLater
+)
+
+type PushConsumer interface {
+ Start() error
+ Shutdown()
+ Subscribe(topic string, selector MessageSelector,
+ f func(*ConsumeMessageContext, []*kernel.MessageExt)
(ConsumeResult, error)) error
+}
+
+type pushConsumer struct {
+ *defaultConsumer
+ /**
+ * Backtracking consumption time with second precision. Time format is
+ * 20131223171201<br>
+ * Implying Seventeen twelve and 01 seconds on December 23, 2013
year<br>
+ * Default backtracking consumption time Half an hour ago.
+ */
+ ConsumeTimestamp time.Duration
+ queueFlowControlTimes int
+ queueMaxSpanFlowControlTimes int
+ consume func(*ConsumeMessageContext,
[]*kernel.MessageExt) (ConsumeResult, error)
+ submitToConsume func(*ProcessQueue, *kernel.MessageQueue)
+ subscribedTopic map[string]string
+}
+
+func NewPushConsumer(consumerGroup string, opt ConsumerOption) PushConsumer {
+ dc := &defaultConsumer{
+ consumerGroup: consumerGroup,
+ cType: _PushConsume,
+ state: kernel.StateCreateJust,
+ prCh: make(chan PullRequest, 4),
+ model: opt.ConsumerModel,
+ consumeOrderly: opt.ConsumeOrderly,
+ fromWhere: opt.FromWhere,
+ option: opt,
+ }
+
+ switch opt.Strategy {
+ case StrategyAveragely:
+ dc.allocate = allocateByAveragely
+ case StrategyAveragelyCircle:
+ dc.allocate = allocateByAveragelyCircle
+ case StrategyConfig:
+ dc.allocate = allocateByConfig
+ case StrategyConsistentHash:
+ dc.allocate = allocateByConsistentHash
+ case StrategyMachineNearby:
+ dc.allocate = allocateByMachineNearby
+ case StrategyMachineRoom:
+ dc.allocate = allocateByMachineRoom
+ default:
+ dc.allocate = allocateByAveragely
+ }
+
+ p := &pushConsumer{
+ defaultConsumer: dc,
+ ConsumeTimestamp: 30 * time.Minute,
+ subscribedTopic: make(map[string]string, 0),
+ }
+ dc.mqChanged = p.messageQueueChanged
+ if p.consumeOrderly {
+ p.submitToConsume = p.consumeMessageOrderly
+ } else {
+ p.submitToConsume = p.consumeMessageCurrently
+ }
+ return p
+}
+
+func (pc *pushConsumer) Start() error {
+ var err error
+ pc.once.Do(func() {
+ rlog.Infof("the consumerGroup=%s start beginning.
messageModel=%v, unitMode=%v",
+ pc.consumerGroup, pc.model, pc.unitMode)
+ pc.state = kernel.StateStartFailed
+ pc.validate()
+
+ // set retry topic
+ if pc.model == Clustering {
+ retryTopic := kernel.GetRetryTopic(pc.consumerGroup)
+ pc.subscriptionDataTable.Store(retryTopic,
buildSubscriptionData(retryTopic,
+ MessageSelector{TAG, _SubAll}))
+ }
+
+ pc.client =
kernel.GetOrNewRocketMQClient(pc.option.ClientOption)
+ if pc.model == Clustering {
+ pc.option.ChangeInstanceNameToPID()
+ pc.storage = &remoteBrokerOffsetStore{}
+ } else {
+ pc.storage = &localFileOffsetStore{}
+ }
+ pc.storage.load()
+ go func() {
+ // todo start clean msg expired
+ // TODO quit
+ for {
+ pr := <-pc.prCh
+ go func() {
+ fmt.Println(pr.String())
+ pc.pullMessage(&pr)
+ }()
+ }
+ }()
+
+ err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
+ if err != nil {
+ pc.state = kernel.StateCreateJust
+ rlog.Errorf("the consumer group: [%s] has been created,
specify another name.", pc.consumerGroup)
+ err = errors.New("consumer group has been created")
+ return
+ }
+ pc.client.UpdateTopicRouteInfo()
+ pc.client.RebalanceImmediately()
+ pc.client.Start()
+ pc.state = kernel.StateRunning
+ })
+
+ pc.client.UpdateTopicRouteInfo()
+ pc.client.RebalanceImmediately()
+ pc.client.CheckClientInBroker()
+ pc.client.SendHeartbeatToAllBrokerWithLock()
+ return err
+}
+
+func (pc *pushConsumer) Shutdown() {}
+
+func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
+ f func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult,
error)) error {
+ if pc.state != kernel.StateCreateJust {
+ return errors.New("subscribe topic only started before")
+ }
+ data := buildSubscriptionData(topic, selector)
+ pc.subscriptionDataTable.Store(topic, data)
+ pc.subscribedTopic[topic] = ""
+ pc.consume = f
+ return nil
+}
+
+func (pc *pushConsumer) Rebalance() {
+ pc.defaultConsumer.doBalance()
+}
+
+func (pc *pushConsumer) PersistConsumerOffset() {
+ pc.defaultConsumer.persistConsumerOffset()
+}
+
+func (pc *pushConsumer) UpdateTopicSubscribeInfo(topic string, mqs
[]*kernel.MessageQueue) {
+ pc.defaultConsumer.updateTopicSubscribeInfo(topic, mqs)
+}
+
+func (pc *pushConsumer) IsSubscribeTopicNeedUpdate(topic string) bool {
+ return pc.defaultConsumer.isSubscribeTopicNeedUpdate(topic)
+}
+
+func (pc *pushConsumer) SubscriptionDataList() []*kernel.SubscriptionData {
+ return pc.defaultConsumer.SubscriptionDataList()
+}
+
+func (pc *pushConsumer) IsUnitMode() bool {
+ return pc.unitMode
+}
+
+func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided
[]*kernel.MessageQueue) {
+ // TODO
+}
+
+func (pc *pushConsumer) validate() {
+ kernel.ValidateGroup(pc.consumerGroup)
+
+ if pc.consumerGroup == kernel.DefaultConsumerGroup {
+ // TODO FQA
+ rlog.Fatalf("consumerGroup can't equal [%s], please specify
another one.", kernel.DefaultConsumerGroup)
+ }
+
+ if len(pc.subscribedTopic) == 0 {
+ rlog.Fatal("number of subscribed topics is 0.")
+ }
+
+ if pc.option.ConsumeConcurrentlyMaxSpan < 1 ||
pc.option.ConsumeConcurrentlyMaxSpan > 65535 {
+ if pc.option.ConsumeConcurrentlyMaxSpan == 0 {
+ pc.option.ConsumeConcurrentlyMaxSpan = 1000
+ } else {
+ rlog.Fatal("option.ConsumeConcurrentlyMaxSpan out of
range [1, 65535]")
+ }
+ }
+
+ if pc.option.PullThresholdForQueue < 1 ||
pc.option.PullThresholdForQueue > 65535 {
+ if pc.option.PullThresholdForQueue == 0 {
+ pc.option.PullThresholdForQueue = 1024
+ } else {
+ rlog.Fatal("option.PullThresholdForQueue out of range
[1, 65535]")
+ }
+ }
+
+ if pc.option.PullThresholdForTopic < 1 ||
pc.option.PullThresholdForTopic > 6553500 {
+ if pc.option.PullThresholdForTopic == 0 {
+ pc.option.PullThresholdForTopic = 102400
+ } else {
+ rlog.Fatal("option.PullThresholdForTopic out of range
[1, 6553500]")
+ }
+ }
+
+ if pc.option.PullThresholdSizeForQueue < 1 ||
pc.option.PullThresholdSizeForQueue > 1024 {
+ if pc.option.PullThresholdSizeForQueue == 0 {
+ pc.option.PullThresholdSizeForQueue = 512
+ } else {
+ rlog.Fatal("option.PullThresholdSizeForQueue out of
range [1, 1024]")
+ }
+ }
+
+ if pc.option.PullThresholdSizeForTopic < 1 ||
pc.option.PullThresholdSizeForTopic > 102400 {
+ if pc.option.PullThresholdSizeForTopic == 0 {
+ pc.option.PullThresholdSizeForTopic = 51200
+ } else {
+ rlog.Fatal("option.PullThresholdSizeForTopic out of
range [1, 102400]")
+ }
+ }
+
+ if pc.option.PullInterval < 0 || pc.option.PullInterval > 65535 {
+ rlog.Fatal("option.PullInterval out of range [0, 65535]")
+ }
+
+ if pc.option.ConsumeMessageBatchMaxSize < 1 ||
pc.option.ConsumeMessageBatchMaxSize > 1024 {
+ if pc.option.ConsumeMessageBatchMaxSize == 0 {
+ pc.option.ConsumeMessageBatchMaxSize = 512
+ } else {
+ rlog.Fatal("option.ConsumeMessageBatchMaxSize out of
range [1, 1024]")
+ }
+ }
+
+ if pc.option.PullBatchSize < 1 || pc.option.PullBatchSize > 1024 {
+ if pc.option.PullBatchSize == 0 {
+ pc.option.PullBatchSize = 1
+ } else {
+ rlog.Fatal("option.PullBatchSize out of range [1,
1024]")
+ }
+ }
+}
+
+func (pc *pushConsumer) pullMessage(request *PullRequest) {
+ rlog.Infof("start a nwe Pull Message task %s for [%s]",
request.String(), pc.consumerGroup)
+ var sleepTime time.Duration
+ pq := request.pq
+ go func() {
+ for {
+ pc.submitToConsume(request.pq, request.mq)
+ }
+ }()
+ for {
+ NEXT:
+ if pq.dropped {
+ rlog.Infof("the request: [%s] was dropped, so stop
task", request.String())
+ return
+ }
+ if sleepTime > 0 {
+ rlog.Infof("pull MessageQueue: %d sleep %d ms",
request.mq.QueueId, sleepTime/time.Millisecond)
+ time.Sleep(sleepTime)
+ }
+ // reset time
+ sleepTime = pc.option.PullInterval
+ pq.lastPullTime = time.Now()
+ err := pc.makeSureStateOK()
+ rlog.Debugf("pull MessageQueue: %d", request.mq.QueueId)
+ if err != nil {
+ rlog.Warnf("consumer state error: %s", err.Error())
+ sleepTime = _PullDelayTimeWhenError
+ goto NEXT
+ }
+
+ if pc.pause {
+ rlog.Infof("consumer [%s] of [%s] was paused, execute
pull request [%s] later",
+ pc.option.InstanceName, pc.consumerGroup,
request.String())
+ sleepTime = _PullDelayTimeWhenSuspend
+ goto NEXT
+ }
+
+ cachedMessageSizeInMiB := int(pq.cachedMsgSize / Mb)
+ if pq.cachedMsgCount > pc.option.PullThresholdForQueue {
+ if pc.queueFlowControlTimes%1000 == 0 {
+ rlog.Warnf("the cached message count exceeds
the threshold %d, so do flow control, "+
+ "minOffset=%d, maxOffset=%d, count=%d,
size=%d MiB, pullRequest=%s, flowControlTimes=%d",
+ pc.option.PullThresholdForQueue, 0,
pq.msgCache.Front().Value.(int64),
+ pq.msgCache.Back().Value.(int64),
+ pq.msgCache, cachedMessageSizeInMiB,
request.String(), pc.queueFlowControlTimes)
+ }
+ pc.queueFlowControlTimes++
+ sleepTime = _PullDelayTimeWhenFlowControl
+ goto NEXT
+ }
+
+ if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue
{
+ if pc.queueFlowControlTimes%1000 == 0 {
+ rlog.Warnf("the cached message size exceeds the
threshold %d MiB, so do flow control, "+
+ "minOffset=%d, maxOffset=%d, count=%d,
size=%d MiB, pullRequest=%s, flowControlTimes=%d",
+ pc.option.PullThresholdSizeForQueue, 0,
//processQueue.getMsgTreeMap().firstKey(),
+ 0, // TODO
processQueue.getMsgTreeMap().lastKey(),
+ pq.msgCache, cachedMessageSizeInMiB,
request.String(), pc.queueFlowControlTimes)
+ }
+ pc.queueFlowControlTimes++
+ sleepTime = _PullDelayTimeWhenFlowControl
+ goto NEXT
+ }
+
+ if !pc.consumeOrderly {
+ if pq.getMaxSpan() >
pc.option.ConsumeConcurrentlyMaxSpan {
+
+ if pc.queueMaxSpanFlowControlTimes%1000 == 0 {
+ rlog.Warnf("the queue's messages, span
too long, so do flow control, minOffset=%d, "+
+ "maxOffset=%d, maxSpan=%d,
pullRequest=%s, flowControlTimes=%d",
+ 0,
//processQueue.getMsgTreeMap().firstKey(),
+ 0, //
processQueue.getMsgTreeMap().lastKey(),
+ pq.getMaxSpan(),
+ request.String(),
pc.queueMaxSpanFlowControlTimes)
+ }
+ sleepTime = _PullDelayTimeWhenFlowControl
+ goto NEXT
+ }
+ } else {
+ if pq.locked {
+ if !request.lockedFirst {
+ offset :=
pc.computePullFromWhere(request.mq)
+ brokerBusy := offset <
request.nextOffset
+ rlog.Infof("the first time to pull
message, so fix offset from broker. "+
+ "pullRequest: [%s] NewOffset:
%d brokerBusy: %v",
+ request.String(), offset,
brokerBusy)
+ if brokerBusy {
+ rlog.Infof("[NOTIFY_ME]the
first time to pull message, but pull request offset"+
+ " larger than broker
consume offset. pullRequest: [%s] NewOffset: %d",
+ request.String(),
offset)
+ }
+
+ request.lockedFirst = true
+ request.nextOffset = offset
+ }
+ } else {
+ rlog.Infof("pull message later because not
locked in broker, [%s]", request.String())
+ sleepTime = _PullDelayTimeWhenError
+ goto NEXT
+ }
+ }
+
+ v, exist := pc.subscriptionDataTable.Load(request.mq.Topic)
+ if !exist {
+ rlog.Warnf("find the consumer's subscription failed,
%s", request.String())
+ sleepTime = _PullDelayTimeWhenError
+ goto NEXT
+ }
+ beginTime := time.Now()
+ var (
+ commitOffsetEnable bool
+ commitOffsetValue int64
+ subExpression string
+ )
+
+ if pc.model == Clustering {
+ commitOffsetValue = pc.storage.read(request.mq,
_ReadFromMemory)
+ if commitOffsetValue > 0 {
+ commitOffsetEnable = true
+ }
+ }
+
+ sd := v.(*kernel.SubscriptionData)
+ classFilter := sd.ClassFilterMode
+ if pc.option.PostSubscriptionWhenPull && classFilter {
+ subExpression = sd.SubString
+ }
+
+ sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression
!= "", classFilter)
+
+ pullRequest := &kernel.PullMessageRequest{
+ ConsumerGroup: pc.consumerGroup,
+ Topic: request.mq.Topic,
+ QueueId: int32(request.mq.QueueId),
+ QueueOffset: request.nextOffset,
+ MaxMsgNums: pc.option.PullBatchSize,
+ SysFlag: sysFlag,
+ CommitOffset: 0,
+ SubExpression: _SubAll,
+ ExpressionType: string(TAG), // TODO
+ }
+ //
+ //if data.ExpType == string(TAG) {
+ // pullRequest.SubVersion = 0
+ //} else {
+ // pullRequest.SubVersion = data.SubVersion
+ //}
+
+ //ch := make(chan *kernel.PullResult)
+ brokerResult := tryFindBroker(request.mq)
+ if brokerResult == nil {
+ rlog.Warnf("no broker found for %s",
request.mq.String())
+ sleepTime = _PullDelayTimeWhenError
+ goto NEXT
+ }
+ result, err := pc.client.PullMessage(context.Background(),
brokerResult.BrokerAddr, pullRequest)
+ if err != nil {
+ rlog.Warnf("pull message from %s error: %s",
"127.0.0.1:10911", err.Error())
+ sleepTime = _PullDelayTimeWhenError
+ goto NEXT
+ }
+
+ if result.Status == kernel.PullBrokerTimeout {
+ rlog.Warnf("pull broker: %s timeout", "127.0.0.1:10911")
+ sleepTime = _PullDelayTimeWhenError
+ goto NEXT
+ }
+
+ switch result.Status {
+ case kernel.PullFound:
+ rlog.Debugf("Topic: %s, QueueId: %d found messages:
%d", request.mq.Topic, request.mq.QueueId,
+ len(result.GetMessageExts()))
+ prevRequestOffset := request.nextOffset
+ request.nextOffset = result.NextBeginOffset
+
+ rt := time.Now().Sub(beginTime)
+ increasePullRT(pc.consumerGroup, request.mq.Topic, rt)
+
+ msgFounded := result.GetMessageExts()
+ firstMsgOffset := int64(math.MaxInt64)
+ if msgFounded != nil && len(msgFounded) != 0 {
+ firstMsgOffset = msgFounded[0].QueueOffset
+ increasePullTPS(pc.consumerGroup,
request.mq.Topic, len(msgFounded))
+ pq.putMessage(msgFounded)
+ }
+ if result.NextBeginOffset < prevRequestOffset ||
firstMsgOffset < prevRequestOffset {
+ rlog.Warnf("[BUG] pull message result maybe
data wrong, [nextBeginOffset=%s, "+
+ "firstMsgOffset=%d,
prevRequestOffset=%d]", result.NextBeginOffset, firstMsgOffset,
prevRequestOffset)
+ }
+ case kernel.PullNoNewMsg:
+ rlog.Infof("Topic: %s, QueueId: %d, no more msg",
request.mq.Topic, request.mq.QueueId)
+ case kernel.PullNoMsgMatched:
+ request.nextOffset = result.NextBeginOffset
+ pc.correctTagsOffset(request)
+ case kernel.PullOffsetIllegal:
+ rlog.Warnf("the pull request offset illegal, {} {}",
request.String(), result.String())
+ request.nextOffset = result.NextBeginOffset
+ pq.dropped = true
+ go func() {
+ time.Sleep(10 * time.Second)
+ pc.storage.update(request.mq,
request.nextOffset, false)
+
pc.storage.persist([]*kernel.MessageQueue{request.mq})
+ pc.storage.remove(request.mq)
+ rlog.Warnf("fix the pull request offset: %s",
request.String())
+ }()
+ default:
+ rlog.Warnf("")
+ sleepTime = _PullDelayTimeWhenError
+ }
+ }
+}
+
+func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) {
+ // TODO
+}
+
+func (pc *pushConsumer) sendMessageBack(ctx *ConsumeMessageContext, msg
*kernel.MessageExt) bool {
+ return true
+}
+
+type ConsumeMessageContext struct {
+ consumerGroup string
+ msgs []*kernel.MessageExt
+ mq *kernel.MessageQueue
+ success bool
+ status string
+ // mqTractContext
+ properties map[string]string
+}
+
+func (pc *pushConsumer) consumeMessageCurrently(pq *ProcessQueue, mq
*kernel.MessageQueue) {
+ msgs := pq.takeMessages(32)
+ if msgs == nil {
+ return
+ }
+ for count := 0; count < len(msgs); count++ {
+ var subMsgs []*kernel.MessageExt
+ if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) {
+ subMsgs = msgs[count:]
+ count = len(msgs)
+ } else {
+ next := count + pc.option.ConsumeMessageBatchMaxSize
+ subMsgs = msgs[count:next]
+ count = next
+ }
+ go func() {
+ RETRY:
+ if pq.dropped {
+ rlog.Infof("the message queue not be able to
consume, because it was dropped. group=%s, mq=%s",
+ pc.consumerGroup, mq.String())
+ return
+ }
+
+ ctx := &ConsumeMessageContext{
+ properties: make(map[string]string),
+ }
+ // TODO hook
+ beginTime := time.Now()
+ groupTopic := kernel.RetryGroupTopicPrefix +
pc.consumerGroup
+ for idx := range subMsgs {
+ msg := subMsgs[idx]
+ retryTopic :=
msg.Properties[kernel.PropertyRetryTopic]
+ if retryTopic == "" && groupTopic == msg.Topic {
+ msg.Topic = retryTopic
+ }
+
subMsgs[idx].Properties[kernel.PropertyConsumeStartTime] = strconv.FormatInt(
+
beginTime.UnixNano()/int64(time.Millisecond), 10)
+ }
+ result, err := pc.consume(ctx, subMsgs)
+ consumeRT := time.Now().Sub(beginTime)
+ if err != nil {
+ ctx.properties["ConsumeContextType"] =
"EXCEPTION"
+ } else if consumeRT >= pc.option.ConsumeTimeout {
+ ctx.properties["ConsumeContextType"] = "TIMEOUT"
+ } else if result == ConsumeSuccess {
+ ctx.properties["ConsumeContextType"] = "SUCCESS"
+ } else {
+ ctx.properties["ConsumeContextType"] =
"RECONSUME_LATER"
+ }
+
+ // TODO hook
+ increaseConsumeRT(pc.consumerGroup, mq.Topic, consumeRT)
+
+ if !pq.dropped {
+ msgBackFailed := make([]*kernel.MessageExt, 0)
+ if result == ConsumeSuccess {
+ increaseConsumeOKTPS(pc.consumerGroup,
mq.Topic, len(subMsgs))
+ } else {
+
increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
+ if pc.model == BroadCasting {
+ for i := 0; i < len(msgs); i++ {
+
rlog.Warnf("BROADCASTING, the message=%s consume failed, drop it, {}",
subMsgs[i])
+ }
+ } else {
+ for i := 0; i < len(msgs); i++ {
+ msg := msgs[i]
+ if
!pc.sendMessageBack(ctx, msg) {
+
msg.ReconsumeTimes += 1
+ msgBackFailed =
append(msgBackFailed, msg)
+ }
+ }
+ }
+ }
+
+ offset := pq.removeMessage(len(subMsgs))
+
+ if offset >= 0 && !pq.dropped {
+ pc.storage.update(mq, int64(offset),
true)
+ }
+ if len(msgBackFailed) > 0 {
+ subMsgs = msgBackFailed
+ time.Sleep(5 * time.Second)
+ goto RETRY
+ }
+ } else {
+ rlog.Warnf("processQueue is dropped without
process consume result. messageQueue=%s, msgs=%+v",
+ mq, msgs)
+ }
+ }()
+ }
+}
+
+func (pc *pushConsumer) consumeMessageOrderly(pq *ProcessQueue, mq
*kernel.MessageQueue) {
+}
diff --git a/consumer/push_consumer_test.go b/consumer/push_consumer_test.go
new file mode 100644
index 0000000..f7bc454
--- /dev/null
+++ b/consumer/push_consumer_test.go
@@ -0,0 +1,18 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
diff --git a/consumer/statistics.go b/consumer/statistics.go
new file mode 100644
index 0000000..29045a0
--- /dev/null
+++ b/consumer/statistics.go
@@ -0,0 +1,60 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import "time"
+
+var (
+ topicAndGroupConsumeOKTPS = &statsItemSet{statsName:
"CONSUME_OK_TPS"}
+ topicAndGroupConsumeRT = &statsItemSet{statsName:
"CONSUME_FAILED_TPS"}
+ topicAndGroupConsumeFailedTPS = &statsItemSet{statsName: "CONSUME_RT"}
+ topicAndGroupPullTPS = &statsItemSet{statsName: "PULL_TPS"}
+ topicAndGroupPullRT = &statsItemSet{statsName: "PULL_RT"}
+)
+
+type statsItem struct {
+}
+
+type statsItemSet struct {
+ statsName string
+ statsItemTable map[string]statsItem
+}
+
+func (set *statsItemSet) addValue(key string, incValue, incTimes int) {
+
+}
+
+func increasePullRT(group, topic string, rt time.Duration) {
+
+}
+
+func increaseConsumeRT(group, topic string, rt time.Duration) {
+
+}
+
+func increasePullTPS(group, topic string, msgNumber int) {
+
+}
+
+func increaseConsumeOKTPS(group, topic string, msgNumber int) {
+
+}
+
+func increaseConsumeFailedTPS(group, topic string, msgNumber int) {
+
+}
diff --git a/consumer/strategy.go b/consumer/strategy.go
new file mode 100644
index 0000000..1cb6b2b
--- /dev/null
+++ b/consumer/strategy.go
@@ -0,0 +1,130 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+ "github.com/apache/rocketmq-client-go/kernel"
+ "github.com/apache/rocketmq-client-go/rlog"
+ "github.com/apache/rocketmq-client-go/utils"
+)
+
+// Strategy Algorithm for message allocating between consumers
+type AllocateStrategy string
+
+const (
+ // An allocate strategy proxy for based on machine room nearside
priority. An actual allocate strategy can be
+ // specified.
+ //
+ // If any consumer is alive in a machine room, the message queue of the
broker which is deployed in the same machine
+ // should only be allocated to those. Otherwise, those message queues
can be shared along all consumers since there are
+ // no alive consumer to monopolize them.
+ StrategyMachineNearby = AllocateStrategy("MachineNearby")
+
+ // Average Hashing queue algorithm
+ StrategyAveragely = AllocateStrategy("Averagely")
+
+ // Cycle average Hashing queue algorithm
+ StrategyAveragelyCircle = AllocateStrategy("AveragelyCircle")
+
+ // Use Message Queue specified
+ StrategyConfig = AllocateStrategy("Config")
+
+ // Computer room Hashing queue algorithm, such as Alipay logic room
+ StrategyMachineRoom = AllocateStrategy("MachineRoom")
+
+ // Consistent Hashing queue algorithm
+ StrategyConsistentHash = AllocateStrategy("ConsistentHash")
+)
+
+func allocateByAveragely(consumerGroup, currentCID string, mqAll
[]*kernel.MessageQueue,
+ cidAll []string) []*kernel.MessageQueue {
+ if currentCID == "" || utils.IsArrayEmpty(mqAll) ||
utils.IsArrayEmpty(cidAll) {
+ return nil
+ }
+ var (
+ find bool
+ index int
+ )
+
+ for idx := range cidAll {
+ if cidAll[idx] == currentCID {
+ find = true
+ index = idx
+ break
+ }
+ }
+ if !find {
+ rlog.Infof("[BUG] ConsumerGroup=%s, ConsumerId=%s not in
cidAll:%+v", consumerGroup, currentCID, cidAll)
+ return nil
+ }
+
+ mqSize := len(mqAll)
+ cidSize := len(cidAll)
+ mod := mqSize % cidSize
+
+ var averageSize int
+ if mqSize <= cidSize {
+ averageSize = 1
+ } else {
+ if mod > 0 && index < mod {
+ averageSize = mqSize/cidSize + 1
+ } else {
+ averageSize = mqSize / cidSize
+ }
+ }
+
+ var startIndex int
+ if mod > 0 && index < mod {
+ startIndex = index * averageSize
+ } else {
+ startIndex = index*averageSize + mod
+ }
+
+ num := utils.MinInt(averageSize, mqSize-startIndex)
+ result := make([]*kernel.MessageQueue, num)
+ for i := 0; i < num; i++ {
+ result[i] = mqAll[(startIndex+i)%mqSize]
+ }
+ return result
+}
+
+// TODO
+func allocateByMachineNearby(consumerGroup, currentCID string, mqAll
[]*kernel.MessageQueue,
+ cidAll []string) []*kernel.MessageQueue {
+ return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func allocateByAveragelyCircle(consumerGroup, currentCID string, mqAll
[]*kernel.MessageQueue,
+ cidAll []string) []*kernel.MessageQueue {
+ return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func allocateByConfig(consumerGroup, currentCID string, mqAll
[]*kernel.MessageQueue,
+ cidAll []string) []*kernel.MessageQueue {
+ return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func allocateByMachineRoom(consumerGroup, currentCID string, mqAll
[]*kernel.MessageQueue,
+ cidAll []string) []*kernel.MessageQueue {
+ return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func allocateByConsistentHash(consumerGroup, currentCID string, mqAll
[]*kernel.MessageQueue,
+ cidAll []string) []*kernel.MessageQueue {
+ return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
diff --git a/consumer_test.go b/consumer_test.go
deleted file mode 100644
index aaba3cb..0000000
--- a/consumer_test.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package rocketmq
-
-import (
- "fmt"
- "testing"
-)
-
-func TestDefaultConsumer_Pull(t *testing.T) {
- consumer := NewConsumer(ConsumerConfig{
- GroupName: "testGroup",
- })
- consumer.Start()
- result, err := consumer.Pull("test", "*", 32)
- if err != nil {
- t.Fatal(err.Error())
- }
- fmt.Println(len(result.GetMessageExts()))
-}
diff --git a/examples/main.go b/examples/main.go
deleted file mode 100644
index 72a2a68..0000000
--- a/examples/main.go
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package main
-
-import (
- "github.com/apache/rocketmq-client-go/core"
- "gopkg.in/alecthomas/kingpin.v2"
- "os"
-)
-
-var (
- rmq = kingpin.New("rocketmq", "RocketMQ cmd tools")
- namesrv = rmq.Flag("namesrv", "NameServer
address.").Default("localhost:9876").Short('n').String()
- topic = rmq.Flag("topic", "topic
name.").Short('t').Required().String()
- gid = rmq.Flag("groupId", "group
Id").Short('g').Default("testGroup").String()
- amount = rmq.Flag("amount", "how many message to produce or
consume").Default("64").Short('a').Int()
-
- produce = rmq.Command("produce", "send messages to RocketMQ")
- body = produce.Flag("body", "message
body").Short('b').Required().String()
- workerCount = produce.Flag("workerCount", "works of send message with
orderly").Default("1").Short('w').Int()
- orderly = produce.Flag("orderly", "send msg
orderly").Short('o').Bool()
-
- consume = rmq.Command("consume", "consumes message from RocketMQ")
-)
-
-func main() {
- switch kingpin.MustParse(rmq.Parse(os.Args[1:])) {
- case produce.FullCommand():
- pConfig := &rocketmq.ProducerConfig{ClientConfig:
rocketmq.ClientConfig{
- GroupID: *gid,
- NameServer: *namesrv,
- LogC: &rocketmq.LogConfig{
- Path: "example",
- FileSize: 64 * 1 << 10,
- FileNum: 1,
- Level: rocketmq.LogLevelDebug,
- },
- }}
- if *orderly {
- sendMessageOrderly(pConfig)
- } else {
- sendMessage(pConfig)
- }
- case consume.FullCommand():
- cConfig := &rocketmq.PushConsumerConfig{ClientConfig:
rocketmq.ClientConfig{
- GroupID: *gid,
- NameServer: *namesrv,
- LogC: &rocketmq.LogConfig{
- Path: "example",
- FileSize: 64 * 1 << 10,
- FileNum: 1,
- Level: rocketmq.LogLevelInfo,
- },
- }}
-
- ConsumeWithPush(cConfig)
- }
-}
diff --git a/examples/producer.go b/examples/producer.go
deleted file mode 100644
index e1c4d2f..0000000
--- a/examples/producer.go
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package main
-
-import (
- "fmt"
- "github.com/apache/rocketmq-client-go/core"
-)
-
-func sendMessage(config *rocketmq.ProducerConfig) {
- producer, err := rocketmq.NewProducer(config)
-
- if err != nil {
- fmt.Println("create Producer failed, error:", err)
- return
- }
-
- err = producer.Start()
- if err != nil {
- fmt.Println("start producer error", err)
- return
- }
- defer producer.Shutdown()
-
- fmt.Printf("Producer: %s started... \n", producer)
- for i := 0; i < *amount; i++ {
- msg := fmt.Sprintf("%s-%d", *body, i)
- result, err :=
producer.SendMessageSync(&rocketmq.Message{Topic: *topic, Body: msg})
- if err != nil {
- fmt.Println("Error:", err)
- }
- fmt.Printf("send message: %s result: %s\n", msg, result)
- }
- fmt.Println("shutdown producer.")
-}
diff --git a/examples/producer/main.go b/examples/producer/main.go
new file mode 100644
index 0000000..5c12584
--- /dev/null
+++ b/examples/producer/main.go
@@ -0,0 +1,47 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "fmt"
+ "github.com/apache/rocketmq-client-go/consumer"
+ "github.com/apache/rocketmq-client-go/kernel"
+ "os"
+ "time"
+)
+
+func main() {
+ c := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
+ ConsumerModel: consumer.Clustering,
+ FromWhere: consumer.ConsumeFromFirstOffset,
+ })
+ err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx
*consumer.ConsumeMessageContext,
+ msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
+ fmt.Println(msgs)
+ return consumer.ConsumeSuccess, nil
+ })
+ if err != nil {
+ fmt.Println(err.Error())
+ }
+ err = c.Start()
+ if err != nil {
+ fmt.Println(err.Error())
+ os.Exit(-1)
+ }
+ time.Sleep(time.Hour)
+}
diff --git a/examples/producer_orderly.go b/examples/producer_orderly.go
deleted file mode 100644
index 9943f5b..0000000
--- a/examples/producer_orderly.go
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package main
-
-import (
- "fmt"
- "sync"
- "sync/atomic"
-
- "github.com/apache/rocketmq-client-go/core"
-)
-
-type queueSelectorByOrderID struct{}
-
-func (s queueSelectorByOrderID) Select(size int, m *rocketmq.Message, arg
interface{}) int {
- return arg.(int) % size
-}
-
-type worker struct {
- p rocketmq.Producer
- leftMsgCount int64
-}
-
-func (w *worker) run() {
- selector := queueSelectorByOrderID{}
- for atomic.AddInt64(&w.leftMsgCount, -1) >= 0 {
- r, err := w.p.SendMessageOrderly(
- &rocketmq.Message{Topic: *topic, Body: *body},
selector, 7 /*orderID*/, 3,
- )
- if err != nil {
- println("Send Orderly Error:", err)
- }
- fmt.Printf("send orderly result:%+v\n", r)
- }
-}
-
-func sendMessageOrderly(config *rocketmq.ProducerConfig) {
- producer, err := rocketmq.NewProducer(config)
- if err != nil {
- fmt.Println("create Producer failed, error:", err)
- return
- }
-
- producer.Start()
- defer producer.Shutdown()
-
- wg := sync.WaitGroup{}
- wg.Add(*workerCount)
-
- workers := make([]worker, *workerCount)
- for i := range workers {
- workers[i].p = producer
- workers[i].leftMsgCount = (int64)(*amount)
- }
-
- for i := range workers {
- go func(w *worker) { w.run(); wg.Done() }(&workers[i])
- }
-
- wg.Wait()
-}
diff --git a/examples/pull_consumer.go b/examples/pull_consumer.go
deleted file mode 100644
index 1b209c0..0000000
--- a/examples/pull_consumer.go
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package main
-
-import (
- "fmt"
- "time"
-
- "github.com/apache/rocketmq-client-go/core"
-)
-
-func ConsumeWithPull(config *rocketmq.PullConsumerConfig, topic string) {
-
- consumer, err := rocketmq.NewPullConsumer(config)
- if err != nil {
- fmt.Printf("new pull consumer error:%s\n", err)
- return
- }
-
- err = consumer.Start()
- if err != nil {
- fmt.Printf("start consumer error:%s\n", err)
- return
- }
- defer consumer.Shutdown()
-
- mqs := consumer.FetchSubscriptionMessageQueues(topic)
- fmt.Printf("fetch subscription mqs:%+v\n", mqs)
-
- total, offsets, now := 0, map[int]int64{}, time.Now()
-
-PULL:
- for {
- for _, mq := range mqs {
- pr := consumer.Pull(mq, "*", offsets[mq.ID], 32)
- total += len(pr.Messages)
- fmt.Printf("pull %s, result:%+v\n", mq.String(), pr)
-
- switch pr.Status {
- case rocketmq.PullNoNewMsg:
- break PULL
- case rocketmq.PullFound:
- fallthrough
- case rocketmq.PullNoMatchedMsg:
- fallthrough
- case rocketmq.PullOffsetIllegal:
- offsets[mq.ID] = pr.NextBeginOffset
- case rocketmq.PullBrokerTimeout:
- fmt.Println("broker timeout occur")
- }
- }
- }
-
- var timePerMessage time.Duration
- if total > 0 {
- timePerMessage = time.Since(now) / time.Duration(total)
- }
- fmt.Printf("total message:%d, per message time:%d\n", total,
timePerMessage)
-}
diff --git a/examples/push_consumer.go b/examples/push_consumer.go
deleted file mode 100644
index 38e434c..0000000
--- a/examples/push_consumer.go
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package main
-
-import (
- "fmt"
- "github.com/apache/rocketmq-client-go/core"
- "sync/atomic"
-)
-
-func ConsumeWithPush(config *rocketmq.PushConsumerConfig) {
-
- consumer, err := rocketmq.NewPushConsumer(config)
- if err != nil {
- println("create Consumer failed, error:", err)
- return
- }
-
- ch := make(chan interface{})
- var count = (int64)(*amount)
- // MUST subscribe topic before consumer started.
- consumer.Subscribe(*topic, "*", func(msg *rocketmq.MessageExt)
rocketmq.ConsumeStatus {
- fmt.Printf("A message received: \"%s\" \n", msg.Body)
- if atomic.AddInt64(&count, -1) <= 0 {
- ch <- "quit"
- }
- return rocketmq.ConsumeSuccess
- })
-
- err = consumer.Start()
- if err != nil {
- println("consumer start failed,", err)
- return
- }
-
- fmt.Printf("consumer: %s started...\n", consumer)
- <-ch
- err = consumer.Shutdown()
- if err != nil {
- println("consumer shutdown failed")
- return
- }
- println("consumer has shutdown.")
-}
diff --git a/go.mod b/go.mod
index 2701b7c..58ca7c7 100644
--- a/go.mod
+++ b/go.mod
@@ -5,6 +5,7 @@ go 1.11
require (
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc //
indirect
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf //
indirect
+ github.com/emirpasic/gods v1.12.0
github.com/sirupsen/logrus v1.3.0
github.com/stretchr/testify v1.3.0
github.com/tidwall/gjson v1.2.1
diff --git a/go.sum b/go.sum
index 7a45ece..85b0d5e 100644
--- a/go.sum
+++ b/go.sum
@@ -5,6 +5,8 @@ github.com/alecthomas/units
v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/davecgh/go-spew v1.1.0/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/emirpasic/gods v1.12.0
h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
+github.com/emirpasic/gods v1.12.0/go.mod
h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
github.com/konsorten/go-windows-terminal-sequences v1.0.1
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/pmezard/go-difflib v1.0.0
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
diff --git a/kernel/client.go b/kernel/client.go
index f9185ac..e83d598 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -23,34 +23,58 @@ import (
"fmt"
"github.com/apache/rocketmq-client-go/remote"
"github.com/apache/rocketmq-client-go/rlog"
- "github.com/apache/rocketmq-client-go/utils"
"os"
"strconv"
+ "strings"
"sync"
"time"
)
const (
defaultTraceRegionID = "DefaultRegion"
- tranceOff = "false"
-)
-var (
- namesrvAddrs = os.Getenv("rocketmq.namesrv.addr")
- clientIP = utils.LocalIP()
- instanceName = os.Getenv("rocketmq.client.name")
- pollNameServerInterval = 30 * time.Second
- heartbeatBrokerInterval = 30 * time.Second
- persistConsumerOffsetInterval = 5 * time.Second
- unitMode = false
- vipChannelEnabled, _ =
strconv.ParseBool(os.Getenv("com.rocketmq.sendMessageWithVIPChannel"))
- clientID = string(clientIP) + "@" + instanceName
+ // tracing message switch
+ _TranceOff = "false"
+
+ // Pulling topic information interval from the named server
+ _PullNameServerInterval = 30 * time.Second
+
+ // Pulling topic information interval from the named server
+ _HeartbeatBrokerInterval = 30 * time.Second
+
+ // Offset persistent interval for consumer
+ _PersistOffset = 5 * time.Second
+
+ // Rebalance interval
+ _RebalanceInterval = 20 * time.Millisecond
)
var (
- ErrServiceState = errors.New("service state is not Running, please
check")
+ ErrServiceState = errors.New("service state is not running, please
check")
)
+type ClientOption struct {
+ NameServerAddr string
+ ClientIP string
+ InstanceName string
+ UnitMode bool
+ UnitName string
+ VIPChannelEnabled bool
+ UseTLS bool
+}
+
+func (opt *ClientOption) ChangeInstanceNameToPID() {
+ if opt.InstanceName == "DEFAULT" {
+ opt.InstanceName = strconv.Itoa(os.Getegid())
+ }
+}
+
+func (opt *ClientOption) String() string {
+ return fmt.Sprintf("ClientOption [NameServerAddr=%s, ClientIP=%s,
InstanceName=%s, "+
+ "UnitMode=%v, UnitName=%s, VIPChannelEnabled=%v, UseTLS=%v]",
opt.NameServerAddr, opt.ClientIP,
+ opt.InstanceName, opt.UnitMode, opt.UnitName,
opt.VIPChannelEnabled, opt.UseTLS)
+}
+
type InnerProducer interface {
PublishTopicList() []string
UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
@@ -62,15 +86,169 @@ type InnerProducer interface {
}
type InnerConsumer interface {
- DoRebalance()
PersistConsumerOffset()
UpdateTopicSubscribeInfo(topic string, mqs []*MessageQueue)
IsSubscribeTopicNeedUpdate(topic string) bool
+ SubscriptionDataList() []*SubscriptionData
+ Rebalance()
IsUnitMode() bool
}
+type RMQClient struct {
+ option ClientOption
+ // group -> InnerProducer
+ producerMap sync.Map
+
+ // group -> InnerConsumer
+ consumerMap sync.Map
+}
+
+var clientMap sync.Map
+
+func GetOrNewRocketMQClient(option ClientOption) *RMQClient {
+ // TODO
+ return &RMQClient{option: option}
+}
+
+func (c *RMQClient) Start() {
+ // TODO fetchNameServerAddr
+ go func() {}()
+
+ // schedule update route info
+ go func() {
+ // delay
+ time.Sleep(50 * time.Millisecond)
+ for {
+ c.UpdateTopicRouteInfo()
+ time.Sleep(_PullNameServerInterval)
+ }
+ }()
+
+ // TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
+ go func() {}()
+
+ // schedule persist offset
+ go func() {
+ time.Sleep(10 * time.Second)
+ for {
+ c.consumerMap.Range(func(key, value interface{}) bool {
+ consumer := value.(InnerConsumer)
+ consumer.PersistConsumerOffset()
+ return true
+ })
+ time.Sleep(_PersistOffset)
+ }
+ }()
+
+ go func() {
+ for {
+ c.RebalanceImmediately()
+ time.Sleep(time.Second)
+ }
+ }()
+}
+
+func (c *RMQClient) ClientID() string {
+ //id := c.option.ClientIP + "@" + c.option.InstanceName
+ //if c.option.UnitName != "" {
+ // id += "@" + c.option.UnitName
+ //}
+ return "127.0.0.1:10911@DEFAULT"
+}
+
+func (c *RMQClient) CheckClientInBroker() {
+
+}
+
+func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
+ hbData := &heartbeatData{
+ ClientId: c.ClientID(),
+ }
+ pData := make([]producerData, 0)
+ c.producerMap.Range(func(key, value interface{}) bool {
+ pData = append(pData, producerData(key.(string)))
+ return true
+ })
+
+ cData := make([]consumerData, 0)
+ c.consumerMap.Range(func(key, value interface{}) bool {
+ consumer := value.(InnerConsumer)
+ cData = append(cData, consumerData{
+ GroupName: key.(string),
+ CType: "PUSH",
+ MessageModel: "CLUSTERING",
+ Where: "CONSUME_FROM_FIRST_OFFSET",
+ UnitMode: consumer.IsUnitMode(),
+ SubscriptionDatas: consumer.SubscriptionDataList(),
+ })
+ return true
+ })
+ hbData.ProducerDatas = pData
+ hbData.ConsumerDatas = cData
+ if len(pData) == 0 && len(cData) == 0 {
+ rlog.Warn("sending heartbeat, but no consumer and no producer")
+ return
+ }
+ brokerAddressesMap.Range(func(key, value interface{}) bool {
+ brokerName := key.(string)
+ data := value.(*BrokerData)
+ for id, addr := range data.BrokerAddresses {
+ cmd := remote.NewRemotingCommand(ReqHeartBeat, nil,
hbData.encode())
+ response, err := remote.InvokeSync(addr, cmd,
3*time.Second)
+ if err != nil {
+ rlog.Warnf("send heart beat to broker error:
%s", err.Error())
+ return true
+ }
+ if response.Code == ResSuccess {
+ v, exist := brokerVersionMap.Load(brokerName)
+ var m map[string]int32
+ if exist {
+ m = v.(map[string]int32)
+ } else {
+ m = make(map[string]int32, 4)
+ brokerVersionMap.Store(brokerName, m)
+ }
+ m[brokerName] = int32(response.Version)
+ rlog.Infof("send heart beat to broker[%s %s %s]
success", brokerName, id, addr)
+ }
+ }
+ return true
+ })
+}
+
+func (c *RMQClient) UpdateTopicRouteInfo() {
+ publishTopicSet := make(map[string]bool, 0)
+ c.producerMap.Range(func(key, value interface{}) bool {
+ producer := value.(InnerProducer)
+ list := producer.PublishTopicList()
+ for idx := range list {
+ publishTopicSet[list[idx]] = true
+ }
+ return true
+ })
+ for topic := range publishTopicSet {
+ c.UpdatePublishInfo(topic, UpdateTopicRouteInfo(topic))
+ }
+
+ subscribedTopicSet := make(map[string]bool, 0)
+ c.consumerMap.Range(func(key, value interface{}) bool {
+ consumer := value.(InnerConsumer)
+ list := consumer.SubscriptionDataList()
+ for idx := range list {
+ if !strings.HasPrefix(list[idx].Topic,
RetryGroupTopicPrefix) {
+ subscribedTopicSet[list[idx].Topic] = true
+ }
+ }
+ return true
+ })
+
+ for topic := range subscribedTopicSet {
+ c.UpdateSubscribeInfo(topic, UpdateTopicRouteInfo(topic))
+ }
+}
+
// SendMessage with batch by sync
-func SendMessageSync(ctx context.Context, brokerAddrs, brokerName string,
request *SendMessageRequest,
+func (c *RMQClient) SendMessageSync(ctx context.Context, brokerAddrs,
brokerName string, request *SendMessageRequest,
msgs []*Message) (*SendResult, error) {
cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request,
encodeMessages(msgs))
response, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
@@ -79,16 +257,16 @@ func SendMessageSync(ctx context.Context, brokerAddrs,
brokerName string, reques
return nil, err
}
- return processSendResponse(brokerName, msgs, response), nil
+ return c.processSendResponse(brokerName, msgs, response), nil
}
// SendMessageAsync send message with batch by async
-func SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string,
request *SendMessageRequest,
+func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs,
brokerName string, request *SendMessageRequest,
msgs []*Message, f func(result *SendResult)) error {
return nil
}
-func SendMessageOneWay(ctx context.Context, brokerAddrs string, request
*SendMessageRequest,
+func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string,
request *SendMessageRequest,
msgs []*Message) (*SendResult, error) {
cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request,
encodeMessages(msgs))
err := remote.InvokeOneWay(brokerAddrs, cmd)
@@ -98,7 +276,7 @@ func SendMessageOneWay(ctx context.Context, brokerAddrs
string, request *SendMes
return nil, err
}
-func processSendResponse(brokerName string, msgs []*Message, cmd
*remote.RemotingCommand) *SendResult {
+func (c *RMQClient) processSendResponse(brokerName string, msgs []*Message,
cmd *remote.RemotingCommand) *SendResult {
var status SendStatus
switch cmd.Code {
case ResFlushDiskTimeout:
@@ -118,11 +296,11 @@ func processSendResponse(brokerName string, msgs
[]*Message, cmd *remote.Remotin
msgIDs := make([]string, 0)
for i := 0; i < len(msgs); i++ {
- msgIDs = append(msgIDs,
msgs[i].Properties[UniqueClientMessageIdKeyIndex])
+ msgIDs = append(msgIDs,
msgs[i].Properties[PropertyUniqueClientMessageIdKeyIndex])
}
- regionId := cmd.ExtFields[MsgRegion]
- trace := cmd.ExtFields[TraceSwitch]
+ regionId := cmd.ExtFields[PropertyMsgRegion]
+ trace := cmd.ExtFields[PropertyTraceSwitch]
if regionId == "" {
regionId = defaultTraceRegionID
@@ -140,23 +318,22 @@ func processSendResponse(brokerName string, msgs
[]*Message, cmd *remote.Remotin
QueueOffset: sendResponse.QueueOffset,
TransactionID: sendResponse.TransactionId,
RegionID: regionId,
- TraceOn: trace != "" && trace != tranceOff,
+ TraceOn: trace != "" && trace != _TranceOff,
}
}
// PullMessage with sync
-func PullMessage(ctx context.Context, brokerAddrs string, request
*PullMessageRequest) (*PullResult, error) {
+func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string,
request *PullMessageRequest) (*PullResult, error) {
cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
-
res, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
if err != nil {
return nil, err
}
- return processPullResponse(res)
+ return c.processPullResponse(res)
}
-func processPullResponse(response *remote.RemotingCommand) (*PullResult,
error) {
+func (c *RMQClient) processPullResponse(response *remote.RemotingCommand)
(*PullResult, error) {
pullResult := &PullResult{}
switch response.Code {
case ResSuccess:
@@ -164,7 +341,7 @@ func processPullResponse(response *remote.RemotingCommand)
(*PullResult, error)
case ResPullNotFound:
pullResult.Status = PullNoNewMsg
case ResPullRetryImmediately:
- pullResult.Status = PullNoMatchedMsg
+ pullResult.Status = PullNoMsgMatched
case ResPullOffsetMoved:
pullResult.Status = PullOffsetIllegal
default:
@@ -197,70 +374,128 @@ func processPullResponse(response
*remote.RemotingCommand) (*PullResult, error)
}
// PullMessageAsync pull message async
-func PullMessageAsync(ctx context.Context, brokerAddrs string, request
*PullMessageRequest, f func(result *PullResult)) error {
+func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string,
request *PullMessageRequest, f func(result *PullResult)) error {
return nil
}
// QueryMaxOffset with specific queueId and topic
-func QueryMaxOffset(topic string, queueId int) error {
- return nil
+func QueryMaxOffset(topic string, queueId int) (int64, error) {
+ return 0, nil
}
// QueryConsumerOffset with specific queueId and topic of consumerGroup
-func QueryConsumerOffset(consumerGroup, topic string, queue int) (int64,
error) {
+func (c *RMQClient) QueryConsumerOffset(consumerGroup, topic string, queue
int) (int64, error) {
return 0, nil
}
// SearchOffsetByTimestamp with specific queueId and topic
-func SearchOffsetByTimestamp(topic string, queue int, timestamp int64) (int64,
error) {
+func (c *RMQClient) SearchOffsetByTimestamp(topic string, queue int, timestamp
int64) (int64, error) {
return 0, nil
}
// UpdateConsumerOffset with specific queueId and topic
-func UpdateConsumerOffset(consumerGroup, topic string, queue int, offset
int64) error {
+func (c *RMQClient) UpdateConsumerOffset(consumerGroup, topic string, queue
int, offset int64) error {
return nil
}
-var (
- // group -> InnerProducer
- producerMap sync.Map
-
- // group -> InnerConsumer
- consumerMap sync.Map
-)
-
-func CheckClientInBroker() {
+func (c *RMQClient) RegisterConsumer(group string, consumer InnerConsumer)
error {
+ c.consumerMap.Store(group, consumer)
+ return nil
+}
+func (c *RMQClient) UnregisterConsumer(group string) {
}
-func RegisterConsumer(group string, consumer InnerConsumer) {
+func (c *RMQClient) RegisterProducer(group string, producer InnerProducer) {
+}
+func (c *RMQClient) UnregisterProducer(group string) {
}
-func UnregisterConsumer(group string) {
+func (c *RMQClient) SelectProducer(group string) InnerProducer {
+ return nil
+}
+func (c *RMQClient) SelectConsumer(group string) InnerConsumer {
+ return nil
}
-func RegisterProducer(group string, producer InnerProducer) {
+func (c *RMQClient) RebalanceImmediately() {
+ c.consumerMap.Range(func(key, value interface{}) bool {
+ consumer := value.(InnerConsumer)
+ consumer.Rebalance()
+ return true
+ })
+}
+func (c *RMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+ if !c.isNeedUpdatePublishInfo(topic) {
+ return
+ }
+ c.producerMap.Range(func(key, value interface{}) bool {
+ consumer := value.(InnerProducer)
+ publishInfo := routeData2PublishInfo(topic, data)
+ publishInfo.HaveTopicRouterInfo = true
+ consumer.UpdateTopicPublishInfo(topic, publishInfo)
+ return true
+ })
}
-func UnregisterProducer(group string) {
+func (c *RMQClient) isNeedUpdatePublishInfo(topic string) bool {
+ var result bool
+ c.producerMap.Range(func(key, value interface{}) bool {
+ consumer := value.(InnerProducer)
+ if consumer.IsPublishTopicNeedUpdate(topic) {
+ result = true
+ return false
+ }
+ return true
+ })
+ return result
+}
+func (c *RMQClient) UpdateSubscribeInfo(topic string, data *TopicRouteData) {
+ if !c.isNeedUpdateSubscribeInfo(topic) {
+ return
+ }
+ c.consumerMap.Range(func(key, value interface{}) bool {
+ consumer := value.(InnerConsumer)
+ // TODO
+ consumer.UpdateTopicSubscribeInfo(topic,
routeData2SubscribeInfo(topic, data))
+ return true
+ })
}
-func SelectProducer(group string) InnerProducer {
- return nil
+func (c *RMQClient) isNeedUpdateSubscribeInfo(topic string) bool {
+ var result bool
+ c.consumerMap.Range(func(key, value interface{}) bool {
+ consumer := value.(InnerConsumer)
+ if consumer.IsSubscribeTopicNeedUpdate(topic) {
+ result = true
+ return false
+ }
+ return true
+ })
+ return result
}
-func SelectConsumer(group string) InnerConsumer {
- return nil
+func routeData2SubscribeInfo(topic string, data *TopicRouteData)
[]*MessageQueue {
+ list := make([]*MessageQueue, 0)
+ for idx := range data.QueueDataList {
+ qd := data.QueueDataList[idx]
+ if queueIsReadable(qd.Perm) {
+ for i := 0; i < qd.ReadQueueNums; i++ {
+ list = append(list, &MessageQueue{
+ Topic: topic,
+ BrokerName: qd.BrokerName,
+ QueueId: i,
+ })
+ }
+ }
+ }
+ return list
}
func encodeMessages(message []*Message) []byte {
return nil
}
-
-func sendHeartbeatToAllBroker() {
-
-}
diff --git a/kernel/client_test.go b/kernel/client_test.go
new file mode 100644
index 0000000..7a9723c
--- /dev/null
+++ b/kernel/client_test.go
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+import (
+ "context"
+ "testing"
+)
+
+func TestRMQClient_PullMessage(t *testing.T) {
+ client := GetOrNewRocketMQClient(ClientOption{})
+ req := &PullMessageRequest{
+ ConsumerGroup: "testGroup",
+ Topic: "wenfeng",
+ QueueId: 0,
+ QueueOffset: 0,
+ MaxMsgNums: 32,
+ SysFlag: 0x1 << 2,
+ SubExpression: "*",
+ ExpressionType: "TAG",
+ }
+ res, err := client.PullMessage(context.Background(), "127.0.0.1:10911",
req)
+ if err != nil {
+ t.Fatal(err.Error())
+ }
+
+ for _, a := range res.GetMessageExts() {
+ t.Log(string(a.Body))
+ }
+}
diff --git a/kernel/constants.go b/kernel/constants.go
new file mode 100644
index 0000000..a9eacac
--- /dev/null
+++ b/kernel/constants.go
@@ -0,0 +1,27 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+const (
+ RetryGroupTopicPrefix = "%RETRY%"
+ DefaultConsumerGroup = "DEFAULT_CONSUMER"
+)
+
+func GetRetryTopic(group string) string {
+ return RetryGroupTopicPrefix + group
+}
diff --git a/kernel/message.go b/kernel/message.go
index 843a743..117d766 100644
--- a/kernel/message.go
+++ b/kernel/message.go
@@ -20,32 +20,32 @@ package kernel
import "fmt"
const (
- KeySeparator = " "
- Keys = "KEYS"
- Tags = "TAGS"
- WaitStoreMsgOk = "WAIT"
- DelayTimeLevel = "DELAY"
- RetryTopic = "RETRY_TOPIC"
- RealTopic = "REAL_TOPIC"
- RealQueueId = "REAL_QID"
- TransactionPrepared = "TRAN_MSG"
- ProducerGroup = "PGROUP"
- MinOffset = "MIN_OFFSET"
- MaxOffset = "MAX_OFFSET"
- BuyerId = "BUYER_ID"
- OriginMessageId = "ORIGIN_MESSAGE_ID"
- TransferFlag = "TRANSFER_FLAG"
- CorrectionFlag = "CORRECTION_FLAG"
- MQ2Flag = "MQ2_FLAG"
- ReconsumeTime = "RECONSUME_TIME"
- MsgRegion = "MSG_REGION"
- TraceSwitch = "TRACE_ON"
- UniqueClientMessageIdKeyIndex = "UNIQ_KEY"
- MaxReconsumeTimes = "MAX_RECONSUME_TIMES"
- ConsumeStartTime = "CONSUME_START_TIME"
- TranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET"
- TranscationCheckTimes = "TRANSACTION_CHECK_TIMES"
- CheckImmunityTimeInSeconds = "CHECK_IMMUNITY_TIME_IN_SECONDS"
+ PropertyKeySeparator = " "
+ PropertyKeys = "KEYS"
+ PropertyTags = "TAGS"
+ PropertyWaitStoreMsgOk = "WAIT"
+ PropertyDelayTimeLevel = "DELAY"
+ PropertyRetryTopic = "RETRY_TOPIC"
+ PropertyRealTopic = "REAL_TOPIC"
+ PropertyRealQueueId = "REAL_QID"
+ PropertyTransactionPrepared = "TRAN_MSG"
+ PropertyProducerGroup = "PGROUP"
+ PropertyMinOffset = "MIN_OFFSET"
+ PropertyMaxOffset = "MAX_OFFSET"
+ PropertyBuyerId = "BUYER_ID"
+ PropertyOriginMessageId = "ORIGIN_MESSAGE_ID"
+ PropertyTransferFlag = "TRANSFER_FLAG"
+ PropertyCorrectionFlag = "CORRECTION_FLAG"
+ PropertyMQ2Flag = "MQ2_FLAG"
+ PropertyReconsumeTime = "RECONSUME_TIME"
+ PropertyMsgRegion = "MSG_REGION"
+ PropertyTraceSwitch = "TRACE_ON"
+ PropertyUniqueClientMessageIdKeyIndex = "UNIQ_KEY"
+ PropertyMaxReconsumeTimes = "MAX_RECONSUME_TIMES"
+ PropertyConsumeStartTime = "CONSUME_START_TIME"
+ PropertyTranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET"
+ PropertyTranscationCheckTimes = "TRANSACTION_CHECK_TIMES"
+ PropertyCheckImmunityTimeInSeconds =
"CHECK_IMMUNITY_TIME_IN_SECONDS"
)
type Message struct {
@@ -106,7 +106,7 @@ type MessageExt struct {
}
func (msgExt *MessageExt) GetTags() string {
- return msgExt.Properties[Tags]
+ return msgExt.Properties[PropertyTags]
}
func (msgExt *MessageExt) String() string {
diff --git a/kernel/model.go b/kernel/model.go
index f9593ab..d9f4d00 100644
--- a/kernel/model.go
+++ b/kernel/model.go
@@ -20,7 +20,9 @@ package kernel
import (
"bytes"
"encoding/binary"
+ "encoding/json"
"fmt"
+ "github.com/apache/rocketmq-client-go/rlog"
"github.com/apache/rocketmq-client-go/utils"
)
@@ -62,7 +64,7 @@ type PullStatus int
const (
PullFound PullStatus = iota
PullNoNewMsg
- PullNoMatchedMsg
+ PullNoMsgMatched
PullOffsetIllegal
PullBrokerTimeout
)
@@ -92,6 +94,10 @@ func (result *PullResult) GetMessages() []*Message {
return toMessages(result.messageExts)
}
+func (result *PullResult) String() string {
+ return ""
+}
+
func decodeMessage(data []byte) []*MessageExt {
msgs := make([]*MessageExt, 0)
buf := bytes.NewBuffer(data)
@@ -224,10 +230,15 @@ func (mq *MessageQueue) HashCode() int {
return result
}
+func (mq *MessageQueue) Equals(queue *MessageQueue) bool {
+ // TODO
+ return true
+}
+
type FindBrokerResult struct {
BrokerAddr string
Slave bool
- BrokerVersion int
+ BrokerVersion int32
}
type (
@@ -236,90 +247,16 @@ type (
consumeType string
- MessageModel int
- ConsumeFromWhere int
- ServiceState int
+ ServiceState int
)
const (
- ConsumeActively = consumeType("PULL")
- ConsumePassively = consumeType("PUSH")
-
- BroadCasting = MessageModel(1)
- Clustering = MessageModel(2)
-
- ConsumeFromLastOffset ConsumeFromWhere = iota
- ConsumeFromFirstOffset
- ConsumeFromTimestamp
-
- CreateJust ServiceState = iota
- Running
- Shutdown
+ StateCreateJust ServiceState = iota
+ StateStartFailed
+ StateRunning
+ StateShutdown
)
-func (mode MessageModel) String() string {
- switch mode {
- case BroadCasting:
- return "BroadCasting"
- case Clustering:
- return "Clustering"
- default:
- return "Unknown"
- }
-}
-
-type ExpressionType string
-
-const (
- /**
- * <ul>
- * Keywords:
- * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
- * </ul>
- * <p/>
- * <ul>
- * Data type:
- * <li>Boolean, like: TRUE, FALSE</li>
- * <li>String, like: 'abc'</li>
- * <li>Decimal, like: 123</li>
- * <li>Float number, like: 3.1415</li>
- * </ul>
- * <p/>
- * <ul>
- * Grammar:
- * <li>{@code AND, OR}</li>
- * <li>{@code >, >=, <, <=, =}</li>
- * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
- * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
- * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this
operation only support String type.</li>
- * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is
null, or not.</li>
- * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true,
or false.</li>
- * </ul>
- * <p/>
- * <p>
- * Example:
- * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
- * </p>
- */
- SQL92 = ExpressionType("SQL92")
-
- /**
- * Only support or operation such as
- * "tag1 || tag2 || tag3", <br>
- * If null or * expression, meaning subscribe all.
- */
- TAG = ExpressionType("TAG")
-)
-
-func IsTagType(exp string) bool {
- if exp == "" || exp == "TAG" {
- return true
- }
- return false
-}
-
-var SubAll = "*"
-
type SubscriptionData struct {
ClassFilterMode bool
Topic string
@@ -327,20 +264,30 @@ type SubscriptionData struct {
Tags map[string]bool
Codes map[int32]bool
SubVersion int64
- ExpType ExpressionType
+ ExpType string
}
type consumerData struct {
- groupName string
- cType consumeType
- messageModel MessageModel
- where ConsumeFromWhere
- subscriptionDatas []SubscriptionData
- unitMode bool
+ GroupName string `json:"groupName"`
+ CType consumeType `json:"consumeType"`
+ MessageModel string `json:"messageModel"`
+ Where string `json:"consumeFromWhere"`
+ SubscriptionDatas []*SubscriptionData `json:"subscriptionDataSet"`
+ UnitMode bool `json:"unitMode"`
}
type heartbeatData struct {
- clientId string
- producerDatas []producerData
- consumerDatas []consumerData
+ ClientId string `json:"clientID"`
+ ProducerDatas []producerData `json:"producerDataSet"`
+ ConsumerDatas []consumerData `json:"consumerDataSet"`
+}
+
+func (data *heartbeatData) encode() []byte {
+ d, err := json.Marshal(data)
+ if err != nil {
+ rlog.Errorf("marshal heartbeatData error: %s", err.Error())
+ return nil
+ }
+ rlog.Info(string(d))
+ return d
}
diff --git a/kernel/perm.go b/kernel/perm.go
index e78a876..2d568d5 100644
--- a/kernel/perm.go
+++ b/kernel/perm.go
@@ -1,19 +1,19 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file dqueueIstributed with
- * thqueueIs work for additional information regarding copyright ownership.
- * The ASF licenses thqueueIs file to You under the Apache License, Version 2.0
- * (the "License"); you may not use thqueueIs file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * dqueueIstributed under the License queueIs dqueueIstributed on an "AS IS"
BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permqueueIssions and
- * limitations under the License.
- */
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
package kernel
diff --git a/kernel/request.go b/kernel/request.go
index c5e0fef..2647ad5 100644
--- a/kernel/request.go
+++ b/kernel/request.go
@@ -17,12 +17,19 @@ limitations under the License.
package kernel
-import "fmt"
+import (
+ "fmt"
+ "time"
+)
const (
- ReqPullMessage = int16(11)
- ReqGetRouteInfoByTopic = int16(105)
- ReqSendBatchMessage = int16(320)
+ ReqPullMessage = int16(11)
+ ReqHeartBeat = int16(34)
+ ReqGetConsumerListByGroup = int16(38)
+ ReqLockBatchMQ = int16(41)
+ ReqUnlockBatchMQ = int16(42)
+ ReqGetRouteInfoByTopic = int16(105)
+ ReqSendBatchMessage = int16(320)
)
type SendMessageRequest struct {
@@ -49,24 +56,24 @@ func (request *SendMessageRequest) Decode(properties
map[string]string) error {
}
type PullMessageRequest struct {
- ConsumerGroup string `json:"consumerGroup"`
- Topic string `json:"topic"`
- QueueId int32 `json:"queueId"`
- QueueOffset int64 `json:"queueOffset"`
- MaxMsgNums int32 `json:"maxMsgNums"`
- SysFlag int32 `json:"sysFlag"`
- CommitOffset int64 `json:"commitOffset"`
- SuspendTimeoutMillis int64 `json:"suspendTimeoutMillis"`
- SubExpression string `json:"subscription"`
- SubVersion int64 `json:"subVersion"`
- ExpressionType string `json:"expressionType"`
+ ConsumerGroup string `json:"consumerGroup"`
+ Topic string `json:"topic"`
+ QueueId int32 `json:"queueId"`
+ QueueOffset int64 `json:"queueOffset"`
+ MaxMsgNums int32 `json:"maxMsgNums"`
+ SysFlag int32 `json:"sysFlag"`
+ CommitOffset int64 `json:"commitOffset"`
+ SuspendTimeoutMillis time.Duration `json:"suspendTimeoutMillis"`
+ SubExpression string `json:"subscription"`
+ SubVersion int64 `json:"subVersion"`
+ ExpressionType string `json:"expressionType"`
}
func (request *PullMessageRequest) Encode() map[string]string {
maps := make(map[string]string)
maps["consumerGroup"] = request.ConsumerGroup
maps["topic"] = request.Topic
- maps["queueId"] = fmt.Sprintf("%d", request.QueueOffset)
+ maps["queueId"] = fmt.Sprintf("%d", request.QueueId)
maps["queueOffset"] = fmt.Sprintf("%d", request.QueueOffset)
maps["maxMsgNums"] = fmt.Sprintf("%d", request.MaxMsgNums)
maps["sysFlag"] = fmt.Sprintf("%d", request.SysFlag)
@@ -78,6 +85,16 @@ func (request *PullMessageRequest) Encode()
map[string]string {
return maps
}
+type GetConsumerList struct {
+ ConsumerGroup string `json:"consumerGroup"`
+}
+
+func (request *GetConsumerList) Encode() map[string]string {
+ maps := make(map[string]string)
+ maps["consumerGroup"] = request.ConsumerGroup
+ return maps
+}
+
type GetMaxOffsetRequest struct {
Topic string `json:"topic"`
QueueId int32 `json:"queueId"`
diff --git a/kernel/route.go b/kernel/route.go
index dc08edc..f8cd884 100644
--- a/kernel/route.go
+++ b/kernel/route.go
@@ -22,7 +22,9 @@ import (
"errors"
"github.com/apache/rocketmq-client-go/remote"
"github.com/apache/rocketmq-client-go/rlog"
+ "github.com/apache/rocketmq-client-go/utils"
"github.com/tidwall/gjson"
+ "math/rand"
"sort"
"strconv"
"strings"
@@ -32,7 +34,7 @@ import (
)
const (
- requestTimeout = 3000
+ requestTimeout = 3 * time.Second
defaultTopic = "TBW102"
defaultQueueNums = 4
MasterId = int64(0)
@@ -49,9 +51,10 @@ var (
// brokerName -> map[string]int32
brokerVersionMap sync.Map
- publishInfoMap sync.Map
- routeDataMap sync.Map
- lockNamesrv sync.Mutex
+ //publishInfoMap sync.Map
+ //subscribeInfoMap sync.Map
+ routeDataMap sync.Map
+ lockNamesrv sync.Mutex
)
// key is topic, value is TopicPublishInfo
@@ -59,7 +62,7 @@ type TopicPublishInfo struct {
OrderTopic bool
HaveTopicRouterInfo bool
MqList []*MessageQueue
- RouteData *topicRouteData
+ RouteData *TopicRouteData
TopicQueueIndex int32
}
@@ -76,55 +79,63 @@ func (info *TopicPublishInfo) fetchQueueIndex() int {
return int(qIndex) % length
}
-func UpdateTopicRouteInfo(topic string) {
+func UpdateTopicRouteInfo(topic string) *TopicRouteData {
// Todo process lock timeout
lockNamesrv.Lock()
defer lockNamesrv.Unlock()
- routeData, err := queryTopicRouteInfoFromServer(topic, requestTimeout)
+ routeData, err := queryTopicRouteInfoFromServer(topic)
if err != nil {
rlog.Warnf("query topic route from server error: %s", err)
- return
+ return nil
}
if routeData == nil {
rlog.Warnf("queryTopicRouteInfoFromServer return nil, Topic:
%s", topic)
- return
+ return nil
}
- var changed bool
oldRouteData, exist := routeDataMap.Load(topic)
- if !exist || routeData == nil {
- changed = true
- } else {
- changed =
topicRouteDataIsChange(oldRouteData.(*topicRouteData), routeData)
- }
-
- if !changed {
- changed = isNeedUpdateTopicRouteInfo(topic)
- } else {
- rlog.Infof("the topic[%s] route info changed, old[%v]
,new[%s]", topic, oldRouteData, routeData)
+ changed := true
+ if exist {
+ changed =
topicRouteDataIsChange(oldRouteData.(*TopicRouteData), routeData)
}
- if !changed {
- return
+ if changed {
+ routeDataMap.Store(topic, routeData)
+ rlog.Infof("the topic [%s] route info changed, old %v ,new %s",
topic,
+ oldRouteData, routeData.String())
+ for _, brokerData := range routeData.BrokerDataList {
+ brokerAddressesMap.Store(brokerData.BrokerName,
brokerData)
+ }
}
- newTopicRouteData := routeData.clone()
+ return routeData.clone()
+}
- for _, brokerData := range newTopicRouteData.BrokerDataList {
- brokerAddressesMap.Store(brokerData.BrokerName,
brokerData.BrokerAddresses)
+func FindBrokerAddrByTopic(topic string) string {
+ v, exist := routeDataMap.Load(topic)
+ if !exist {
+ return ""
}
-
- // update publish info
- publishInfo := routeData2PublishInfo(topic, routeData)
- publishInfo.HaveTopicRouterInfo = true
-
- old, _ := publishInfoMap.Load(topic)
- publishInfoMap.Store(topic, publishInfoMap)
- if old != nil {
- rlog.Infof("Old TopicPublishInfo [%s] removed.", old)
+ routeData := v.(*TopicRouteData)
+ if len(routeData.BrokerDataList) == 0 {
+ return ""
+ }
+ i := utils.AbsInt(rand.Int())
+ bd := routeData.BrokerDataList[i%len(routeData.BrokerDataList)]
+ addr := bd.BrokerAddresses[MasterId]
+ if addr == "" && len(bd.BrokerAddresses) > 0 {
+ i = i % len(bd.BrokerAddresses)
+ for _, v := range bd.BrokerAddresses {
+ if i <= 0 {
+ addr = v
+ break
+ }
+ i--
+ }
}
+ return addr
}
func FindBrokerAddressInPublish(brokerName string) string {
@@ -144,18 +155,20 @@ func FindBrokerAddressInSubscribe(brokerName string,
brokerId int64, onlyThisBro
found = false
)
- addrs, exist := brokerAddressesMap.Load(brokerName)
+ v, exist := brokerAddressesMap.Load(brokerName)
- if exist {
- for k, v := range addrs.(map[int64]string) {
- if v != "" {
- found = true
- if k != MasterId {
- slave = true
- }
- brokerAddr = v
- break
+ if !exist {
+ return nil
+ }
+ data := v.(*BrokerData)
+ for k, v := range data.BrokerAddresses {
+ if v != "" {
+ found = true
+ if k != MasterId {
+ slave = true
}
+ brokerAddr = v
+ break
}
}
@@ -172,7 +185,7 @@ func FindBrokerAddressInSubscribe(brokerName string,
brokerId int64, onlyThisBro
}
func FetchSubscribeMessageQueues(topic string) ([]*MessageQueue, error) {
- routeData, err := queryTopicRouteInfoFromServer(topic, 3*time.Second)
+ routeData, err := queryTopicRouteInfoFromServer(topic)
if err != nil {
return nil, err
@@ -190,14 +203,14 @@ func FetchSubscribeMessageQueues(topic string)
([]*MessageQueue, error) {
return mqs, nil
}
-func findBrokerVersion(brokerName, brokerAddr string) int {
+func findBrokerVersion(brokerName, brokerAddr string) int32 {
versions, exist := brokerVersionMap.Load(brokerName)
if !exist {
return 0
}
- v, exist := versions.(map[string]int)[brokerAddr]
+ v, exist := versions.(map[string]int32)[brokerAddr]
if exist {
return v
@@ -205,12 +218,12 @@ func findBrokerVersion(brokerName, brokerAddr string) int
{
return 0
}
-func queryTopicRouteInfoFromServer(topic string, timeout time.Duration)
(*topicRouteData, error) {
+func queryTopicRouteInfoFromServer(topic string) (*TopicRouteData, error) {
request := &GetRouteInfoRequest{
Topic: topic,
}
rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
- response, err := remote.InvokeSync(getNameServerAddress(), rc, timeout)
+ response, err := remote.InvokeSync(getNameServerAddress(), rc,
requestTimeout)
if err != nil {
return nil, err
@@ -221,11 +234,11 @@ func queryTopicRouteInfoFromServer(topic string, timeout
time.Duration) (*topicR
if response.Body == nil {
return nil, errors.New(response.Remark)
}
- routeData := &topicRouteData{}
+ routeData := &TopicRouteData{}
err = routeData.decode(string(response.Body))
if err != nil {
- rlog.Warnf("decode topicRouteData error: %s", err)
+ rlog.Warnf("decode TopicRouteData error: %s", err)
return nil, err
}
return routeData, nil
@@ -236,7 +249,7 @@ func queryTopicRouteInfoFromServer(topic string, timeout
time.Duration) (*topicR
}
}
-func topicRouteDataIsChange(oldData *topicRouteData, newData *topicRouteData)
bool {
+func topicRouteDataIsChange(oldData *TopicRouteData, newData *TopicRouteData)
bool {
if oldData == nil || newData == nil {
return true
}
@@ -259,13 +272,7 @@ func topicRouteDataIsChange(oldData *topicRouteData,
newData *topicRouteData) bo
return !oldDataCloned.equals(newDataCloned)
}
-func isNeedUpdateTopicRouteInfo(topic string) bool {
- value, exist := publishInfoMap.Load(topic)
-
- return !exist || value.(*TopicPublishInfo).isOK()
-}
-
-func routeData2PublishInfo(topic string, data *topicRouteData)
*TopicPublishInfo {
+func routeData2PublishInfo(topic string, data *TopicRouteData)
*TopicPublishInfo {
publishInfo := &TopicPublishInfo{
RouteData: data,
OrderTopic: false,
@@ -329,16 +336,20 @@ func getNameServerAddress() string {
return "127.0.0.1:9876"
}
-// topicRouteData topicRouteData
-type topicRouteData struct {
+// TopicRouteData TopicRouteData
+type TopicRouteData struct {
OrderTopicConf string
QueueDataList []*QueueData `json:"queueDatas"`
BrokerDataList []*BrokerData `json:"brokerDatas"`
}
-func (routeData *topicRouteData) decode(data string) error {
+func (routeData *TopicRouteData) decode(data string) error {
res := gjson.Parse(data)
- json.Unmarshal([]byte(res.Get("queueDatas").String()),
&routeData.QueueDataList)
+ err := json.Unmarshal([]byte(res.Get("queueDatas").String()),
&routeData.QueueDataList)
+
+ if err != nil {
+ return err
+ }
bds := res.Get("brokerDatas").Array()
routeData.BrokerDataList = make([]*BrokerData, len(bds))
@@ -365,8 +376,8 @@ func (routeData *topicRouteData) decode(data string) error {
return nil
}
-func (routeData *topicRouteData) clone() *topicRouteData {
- cloned := &topicRouteData{
+func (routeData *TopicRouteData) clone() *TopicRouteData {
+ cloned := &TopicRouteData{
OrderTopicConf: routeData.OrderTopicConf,
QueueDataList: make([]*QueueData,
len(routeData.QueueDataList)),
BrokerDataList: make([]*BrokerData,
len(routeData.BrokerDataList)),
@@ -383,10 +394,15 @@ func (routeData *topicRouteData) clone() *topicRouteData {
return cloned
}
-func (routeData *topicRouteData) equals(data *topicRouteData) bool {
+func (routeData *TopicRouteData) equals(data *TopicRouteData) bool {
return false
}
+func (routeData *TopicRouteData) String() string {
+ data, _ := json.Marshal(routeData)
+ return string(data)
+}
+
// QueueData QueueData
type QueueData struct {
BrokerName string `json:"brokerName"`
diff --git a/kernel/validators.go b/kernel/validators.go
new file mode 100644
index 0000000..f34bd58
--- /dev/null
+++ b/kernel/validators.go
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+import (
+ "github.com/apache/rocketmq-client-go/rlog"
+ "regexp"
+)
+
+const (
+ _ValidPattern = "^[%|a-zA-Z0-9_-]+$"
+ _CharacterMaxLength = 255
+)
+
+var (
+ _Pattern, _ = regexp.Compile("_ValidPattern")
+)
+
+func ValidateGroup(group string) {
+ if group == "" {
+ rlog.Fatal("consumerGroup is empty")
+ }
+
+ //if !_Pattern.Match([]byte(group)) {
+ // rlog.Fatalf("the specified group[%s] contains illegal
characters, allowing only %s", group, _ValidPattern)
+ //}
+
+ if len(group) > _CharacterMaxLength {
+ rlog.Fatal("the specified group is longer than group max length
255.")
+ }
+}
diff --git a/remote/codes.go b/remote/codes.go
deleted file mode 100644
index b94d2e2..0000000
--- a/remote/codes.go
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package remote
-
-const (
- SEND_MESSAGE = 10
- PULL_MESSAGE = 11
- QUERY_MESSAGE = 12
- QUERY_BROKER_OFFSET = 13
- QUERY_CONSUMER_OFFSET = 14
- UPDATE_CONSUMER_OFFSET = 15
- UPDATE_AND_CREATE_TOPIC = 17
- GET_ALL_TOPIC_CONFIG = 21
- GET_TOPIC_CONFIG_LIST = 22
- GET_TOPIC_NAME_LIST = 23
- UPDATE_BROKER_CONFIG = 25
- GET_BROKER_CONFIG = 26
- TRIGGER_DELETE_FILES = 27
- GET_BROKER_RUNTIME_INFO = 28
- SEARCH_OFFSET_BY_TIMESTAMP = 29
- GET_MAX_OFFSET = 30
- GET_MIN_OFFSET = 31
- GET_EARLIEST_MSG_STORETIME = 32
- VIEW_MESSAGE_BY_ID = 33
- HEART_BEAT = 34
- UNREGISTER_CLIENT = 35
- CONSUMER_SEND_MSG_BACK = 36
- END_TRANSACTION = 37
- GET_CONSUMER_LIST_BY_GROUP = 38
- CHECK_TRANSACTION_STATE = 39
- NOTIFY_CONSUMER_IDS_CHANGED = 40
- LOCK_BATCH_MQ = 41
- UNLOCK_BATCH_MQ = 42
- GET_ALL_CONSUMER_OFFSET = 43
- GET_ALL_DELAY_OFFSET = 45
- PUT_KV_CONFIG = 100
- GET_KV_CONFIG = 101
- DELETE_KV_CONFIG = 102
- REGISTER_BROKER = 103
- UNREGISTER_BROKER = 104
- GET_ROUTEINTO_BY_TOPIC = 105
- GET_BROKER_CLUSTER_INFO = 106
- UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200
- GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201
- GET_TOPIC_STATS_INFO = 202
- GET_CONSUMER_CONNECTION_LIST = 203
- GET_PRODUCER_CONNECTION_LIST = 204
- WIPE_WRITE_PERM_OF_BROKER = 205
-
- GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206
- DELETE_SUBSCRIPTIONGROUP = 207
- GET_CONSUME_STATS = 208
- SUSPEND_CONSUMER = 209
- RESUME_CONSUMER = 210
- RESET_CONSUMER_OFFSET_IN_CONSUMER = 211
- RESET_CONSUMER_OFFSET_IN_BROKER = 212
- ADJUST_CONSUMER_THREAD_POOL = 213
- WHO_CONSUME_THE_MESSAGE = 214
-
- DELETE_TOPIC_IN_BROKER = 215
- DELETE_TOPIC_IN_NAMESRV = 216
- GET_KV_CONFIG_BY_VALUE = 217
- DELETE_KV_CONFIG_BY_VALUE = 218
- GET_KVLIST_BY_NAMESPACE = 219
-
- RESET_CONSUMER_CLIENT_OFFSET = 220
- GET_CONSUMER_STATUS_FROM_CLIENT = 221
- INVOKE_BROKER_TO_RESET_OFFSET = 222
- INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223
-
- QUERY_TOPIC_CONSUME_BY_WHO = 300
-
- GET_TOPICS_BY_CLUSTER = 224
-
- REGISTER_FILTER_SERVER = 301
- REGISTER_MESSAGE_FILTER_CLASS = 302
- QUERY_CONSUME_TIME_SPAN = 303
- GET_SYSTEM_TOPIC_LIST_FROM_NS = 304
- GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305
-
- CLEAN_EXPIRED_CONSUMEQUEUE = 306
-
- GET_CONSUMER_RUNNING_INFO = 307
-
- QUERY_CORRECTION_OFFSET = 308
-
- CONSUME_MESSAGE_DIRECTLY = 309
-
- SEND_MESSAGE_V2 = 310
-
- GET_UNIT_TOPIC_LIST = 311
- GET_HAS_UNIT_SUB_TOPIC_LIST = 312
- GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313
- CLONE_GROUP_OFFSET = 314
-
- VIEW_BROKER_STATS_DATA = 315
-)
-
-const (
- SUCCESS = 0
- SYSTEM_ERROR = 1
- SYSTEM_BUSY = 2
- REQUEST_CODE_NOT_SUPPORTED = 3
- TRANSACTION_FAILED = 4
- FLUSH_DISK_TIMEOUT = 10
- SLAVE_NOT_AVAILABLE = 11
- FLUSH_SLAVE_TIMEOUT = 12
- MESSAGE_ILLEGAL = 13
- SERVICE_NOT_AVAILABLE = 14
- VERSION_NOT_SUPPORTED = 15
- NO_PERMISSION = 16
- TOPIC_NOT_EXIST = 17
- TOPIC_EXIST_ALREADY = 18
- PULL_NOT_FOUND = 19
- PULL_RETRY_IMMEDIATELY = 20
- PULL_OFFSET_MOVED = 21
- QUERY_NOT_FOUND = 22
- SUBSCRIPTION_PARSE_FAILED = 23
- SUBSCRIPTION_NOT_EXIST = 24
- SUBSCRIPTION_NOT_LATEST = 25
- SUBSCRIPTION_GROUP_NOT_EXIST = 26
- TRANSACTION_SHOULD_COMMIT = 200
- TRANSACTION_SHOULD_ROLLBACK = 201
- TRANSACTION_STATE_UNKNOW = 202
- TRANSACTION_STATE_GROUP_WRONG = 203
- NO_BUYER_ID = 204
-
- NOT_IN_CURRENT_UNIT = 205
-
- CONSUMER_NOT_ONLINE = 206
-
- CONSUME_MSG_TIMEOUT = 207
-)
diff --git a/remote/client.go b/remote/remote_client.go
similarity index 88%
rename from remote/client.go
rename to remote/remote_client.go
index bfc4c36..9f5839a 100644
--- a/remote/client.go
+++ b/remote/remote_client.go
@@ -21,6 +21,7 @@ import (
"bytes"
"encoding/binary"
"errors"
+ "github.com/apache/rocketmq-client-go/rlog"
"io"
"net"
"sync"
@@ -164,22 +165,26 @@ func connect(addr string) (net.Conn, error) {
func receiveResponse(r net.Conn) {
scanner := createScanner(r)
- for scanner.Scan() {
+ for {
+ scanner.Scan()
receivedRemotingCommand, err := decode(scanner.Bytes())
if err != nil {
closeConnection(r)
+ rlog.Error(err.Error())
break
}
if receivedRemotingCommand.isResponseType() {
- resp, ok :=
responseTable.Load(receivedRemotingCommand.Opaque)
- if ok {
+ resp, exist :=
responseTable.Load(receivedRemotingCommand.Opaque)
+ if exist {
responseTable.Delete(receivedRemotingCommand.Opaque)
responseFuture := resp.(*ResponseFuture)
- responseFuture.ResponseCommand =
receivedRemotingCommand
- responseFuture.executeInvokeCallback()
- if responseFuture.Done != nil {
- responseFuture.Done <- true
- }
+ go func() {
+ responseFuture.ResponseCommand =
receivedRemotingCommand
+ responseFuture.executeInvokeCallback()
+ if responseFuture.Done != nil {
+ responseFuture.Done <- true
+ }
+ }()
}
} else {
// todo handler request from peer
@@ -190,10 +195,20 @@ func receiveResponse(r net.Conn) {
func createScanner(r io.Reader) *bufio.Scanner {
scanner := bufio.NewScanner(r)
scanner.Split(func(data []byte, atEOF bool) (int, []byte, error) {
+ defer func() {
+ if err := recover(); err != nil {
+ rlog.Errorf("panic: %v", err)
+ }
+ }()
if !atEOF {
if len(data) >= 4 {
var length int32
- binary.Read(bytes.NewReader(data[0:4]),
binary.BigEndian, &length)
+ err := binary.Read(bytes.NewReader(data[0:4]),
binary.BigEndian, &length)
+ if err != nil {
+ rlog.Errorf("split data error: %s",
err.Error())
+ return 0, nil, err
+ }
+
if int(length)+4 <= len(data) {
return int(length) + 4, data[4 :
length+4], nil
}
diff --git a/remote/client_test.go b/remote/remote_client_test.go
similarity index 100%
rename from remote/client_test.go
rename to remote/remote_client_test.go
diff --git a/rlog/log.go b/rlog/log.go
index c73c9ba..4f5adaf 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -22,8 +22,6 @@ import (
)
type Logger interface {
- Print(i ...interface{})
- Printf(format string, args ...interface{})
Debug(i ...interface{})
Debugf(format string, args ...interface{})
Info(i ...interface{})
@@ -42,14 +40,6 @@ func SetLogger(log Logger) {
rLog = log
}
-func Print(i ...interface{}) {
- rLog.Print(i...)
-}
-
-func Printf(format string, args ...interface{}) {
- rLog.Printf(format, args...)
-}
-
func Debug(i ...interface{}) {
rLog.Debug(i...)
}
diff --git a/utils/helper.go b/utils/helper.go
index 485ae15..1ad0441 100644
--- a/utils/helper.go
+++ b/utils/helper.go
@@ -22,7 +22,6 @@ import (
"encoding/binary"
"errors"
"fmt"
- "hash/crc32"
"net"
"os"
"sync"
@@ -99,14 +98,10 @@ func ClassLoaderID() int32 {
return 0
}
-// HashString hashes a string to a unique hashcode.
-func HashString(s string) int {
- if s == "" {
- return 0
- }
- return int(crc32.ChecksumIEEE([]byte(s)))
-}
-
func UnCompress(data []byte) []byte {
return data
}
+
+func IsArrayEmpty(i ...interface{}) bool {
+ return i == nil || len(i) == 0
+}
diff --git a/utils/helper_test.go b/utils/helper_test.go
index 4e1877c..967b0ae 100644
--- a/utils/helper_test.go
+++ b/utils/helper_test.go
@@ -16,7 +16,9 @@
*/
package utils
-import "testing"
+import (
+ "testing"
+)
func TestClassLoaderID(t *testing.T) {
if ClassLoaderID() != 0 {
diff --git a/utils/math.go b/utils/math.go
new file mode 100644
index 0000000..816631e
--- /dev/null
+++ b/utils/math.go
@@ -0,0 +1,32 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+func AbsInt(i int) int {
+ if i >= 0 {
+ return i
+ }
+ return -i
+}
+
+func MinInt(a, b int) int {
+ if a < b {
+ return a
+ }
+ return b
+}
diff --git a/utils/string.go b/utils/string.go
new file mode 100644
index 0000000..6a74808
--- /dev/null
+++ b/utils/string.go
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import "fmt"
+
+// HashString hashes a string to a unique hashcode.
+func HashString(s string) int {
+ val := []byte(s)
+ var h int
+
+ for idx := range val {
+ h = 31*h + int(val[idx])
+ }
+
+ return h
+}
+
+func StrJoin(str, key string, value interface{}) string {
+ if key == "" || value == "" {
+ return str
+ }
+
+ return str + key + ": " + fmt.Sprint(value) + ", "
+}