This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new cb6cc2d  Adding consumer of original (#39)
cb6cc2d is described below

commit cb6cc2d3e1647994b5955c7e05e1e6ada7e0387b
Author: wenfeng <[email protected]>
AuthorDate: Tue Mar 12 20:50:20 2019 +0800

    Adding consumer of original (#39)
    
    * Adding more implementations of common
    
    * Adding consuemr of original
    
    * fix compile error
---
 consumer.go                                   | 269 ++++++++++++++++++++++++++
 go.mod                                        |   1 -
 go.sum                                        |   2 -
 common/manager.go => kernel/client.go         | 121 ++++--------
 {common => kernel}/message.go                 |  15 +-
 kernel/model.go                               | 241 +++++++++++++++++++++++
 common/transaction.go => kernel/mq_version.go |  15 +-
 {common => kernel}/perm.go                    |  26 +--
 {common => kernel}/request.go                 |   5 +-
 {common => kernel}/response.go                |   2 +-
 {common => kernel}/route.go                   | 134 +++++++++----
 {common => kernel}/transaction.go             |   2 +-
 remote/client.go                              |  12 ++
 common/transaction.go => utils/helper.go      |  15 +-
 common/perm.go => utils/log.go                |  64 +++---
 15 files changed, 726 insertions(+), 198 deletions(-)

diff --git a/consumer.go b/consumer.go
new file mode 100644
index 0000000..896df74
--- /dev/null
+++ b/consumer.go
@@ -0,0 +1,269 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package rocketmq
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "github.com/apache/rocketmq-client-go/kernel"
+       "strconv"
+       "sync"
+       "time"
+)
+
+type Consumer interface {
+       Pull(topic, expression string, numbers int) (*kernel.PullResult, error)
+       SubscribeWithChan(topic, expression string) (chan *kernel.Message, 
error)
+       SubscribeWithFunc(topic, expression string, f func(msg *kernel.Message) 
ConsumeResult) error
+       ACK(msg *kernel.Message, result ConsumeResult)
+}
+
+type ConsumeResult int
+
+type ConsumerType int
+
+const (
+       Original ConsumerType = iota
+       Orderly
+       Transaction
+)
+
+type ConsumerConfig struct {
+       GroupName                  string
+       Model                      kernel.MessageModel
+       UnitMode                   bool
+       MaxReconsumeTimes          int
+       PullMessageTimeout         time.Duration
+       FromWhere                  kernel.ConsumeFromWhere
+       brokerSuspendMaxTimeMillis int64
+}
+
+func NewConsumer(config ConsumerConfig) Consumer {
+       return &defaultConsumer{
+               config: config,
+       }
+}
+
+type defaultConsumer struct {
+       state  kernel.ServiceState
+       config ConsumerConfig
+}
+
+func (c *defaultConsumer) Pull(topic, expression string, numbers int) 
(*kernel.PullResult, error) {
+       mq := getNextQueueOf(topic)
+
+       if mq == nil {
+               return nil, fmt.Errorf("prepard to pull topic: %s, but no queue 
is founded", topic)
+       }
+
+       data := getSubscriptionData(mq, expression)
+       result, err := c.pull(context.Background(), mq, data, 
c.nextOffsetOf(mq), numbers)
+
+       if err != nil {
+               return nil, err
+       }
+
+       processPullResult(mq, result, data)
+       return result, nil
+}
+
+// SubscribeWithChan ack manually
+func (c *defaultConsumer) SubscribeWithChan(topic, expression string) (chan 
*kernel.Message, error) {
+       return nil, nil
+}
+
+// SubscribeWithFunc ack automatic
+func (c *defaultConsumer) SubscribeWithFunc(topic, expression string,
+       f func(msg *kernel.Message) ConsumeResult) error {
+       return nil
+}
+
+func (c *defaultConsumer) ACK(msg *kernel.Message, result ConsumeResult) {
+
+}
+
+func (c *defaultConsumer) pull(ctx context.Context, mq *kernel.MessageQueue, 
data *kernel.SubscriptionData,
+       offset int64, numbers int) (*kernel.PullResult, error) {
+       err := c.makeSureStateOK()
+       if err != nil {
+               return nil, err
+       }
+
+       if mq == nil {
+               return nil, errors.New("MessageQueue is nil")
+       }
+
+       if offset < 0 {
+               return nil, errors.New("offset < 0")
+       }
+
+       if numbers <= 0 {
+               numbers = 1
+       }
+       c.subscriptionAutomatically(mq.Topic)
+
+       brokerResult := tryFindBroker(mq)
+       if brokerResult == nil {
+               return nil, fmt.Errorf("the broker %s does not exist", 
mq.BrokerName)
+       }
+
+       if (data.ExpType == kernel.TAG) && brokerResult.BrokerVersion < 
kernel.V4_1_0 {
+               return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to 
support for filter message by %v",
+                       mq.BrokerName, brokerResult.BrokerVersion, data.ExpType)
+       }
+
+       sysFlag := buildSysFlag(false, true, true, false)
+
+       if brokerResult.Slave {
+               sysFlag = clearCommitOffsetFlag(sysFlag)
+       }
+       pullRequest := &kernel.PullMessageRequest{
+               ConsumerGroup:        c.config.GroupName,
+               Topic:                mq.Topic,
+               QueueId:              int32(mq.QueueId),
+               QueueOffset:          offset,
+               MaxMsgNums:           int32(numbers),
+               SysFlag:              sysFlag,
+               CommitOffset:         0,
+               SuspendTimeoutMillis: c.config.brokerSuspendMaxTimeMillis,
+               SubExpression:        data.SubString,
+               ExpressionType:       string(data.ExpType),
+       }
+
+       if data.ExpType == kernel.TAG {
+               pullRequest.SubVersion = 0
+       } else {
+               pullRequest.SubVersion = data.SubVersion
+       }
+
+       // TODO computePullFromWhichFilterServer
+       return kernel.PullMessage(ctx, brokerResult.BrokerAddr, pullRequest)
+}
+
+func (c *defaultConsumer) makeSureStateOK() error {
+       if c.state != kernel.Running {
+               return fmt.Errorf("the consumer state is [%s], not running", 
c.state)
+       }
+       return nil
+}
+
+func (c *defaultConsumer) subscriptionAutomatically(topic string) {
+       // TODO
+}
+
+func (c *defaultConsumer) nextOffsetOf(queue *kernel.MessageQueue) int64 {
+       return 0
+}
+
+func toMessage(messageExts []*kernel.MessageExt) []*kernel.Message {
+       msgs := make([]*kernel.Message, 0)
+
+       return msgs
+}
+
+func processPullResult(mq *kernel.MessageQueue, result *kernel.PullResult, 
data *kernel.SubscriptionData) {
+       updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
+       switch result.Status {
+       case kernel.PullFound:
+               msgs := result.GetMessageExts()
+               msgListFilterAgain := msgs
+               if len(data.Tags) > 0 && data.ClassFilterMode {
+                       msgListFilterAgain = make([]*kernel.MessageExt, 
len(msgs))
+                       for _, msg := range msgs {
+                               _, exist := data.Tags[msg.GetTags()]
+                               if exist {
+                                       msgListFilterAgain = 
append(msgListFilterAgain, msg)
+                               }
+                       }
+               }
+
+               // TODO hook
+
+               for _, msg := range msgListFilterAgain {
+                       traFlag, _ := 
strconv.ParseBool(msg.Properties[kernel.TransactionPrepared])
+                       if traFlag {
+                               msg.TransactionId = 
msg.Properties[kernel.UniqueClientMessageIdKeyIndex]
+                       }
+
+                       msg.Properties[kernel.MinOffset] = 
strconv.FormatInt(result.MinOffset, 10)
+                       msg.Properties[kernel.MaxOffset] = 
strconv.FormatInt(result.MaxOffset, 10)
+               }
+
+               result.SetMessageExts(msgListFilterAgain)
+       }
+}
+
+func getSubscriptionData(mq *kernel.MessageQueue, exp string) 
*kernel.SubscriptionData {
+       return nil
+}
+
+func getNextQueueOf(topic string) *kernel.MessageQueue {
+       return nil
+}
+
+func buildSysFlag(commitOffset, suspend, subscription, classFilter bool) int32 
{
+       var flag int32 = 0
+       if commitOffset {
+               flag |= 0x1 << 0
+       }
+
+       if suspend {
+               flag |= 0x1 << 1
+       }
+
+       if subscription {
+               flag |= 0x1 << 2
+       }
+
+       if classFilter {
+               flag |= 0x1 << 3
+       }
+
+       return flag
+}
+
+func clearCommitOffsetFlag(sysFlag int32) int32 {
+       return sysFlag & (^0x1 << 0)
+}
+
+func tryFindBroker(mq *kernel.MessageQueue) *kernel.FindBrokerResult {
+       result := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, 
recalculatePullFromWhichNode(mq), false)
+
+       if result == nil {
+               kernel.UpdateTopicRouteInfo(mq.Topic)
+       }
+
+       return kernel.FindBrokerAddressInSubscribe(mq.BrokerName, 
recalculatePullFromWhichNode(mq), false)
+}
+
+var (
+       pullFromWhichNodeTable sync.Map
+)
+
+func updatePullFromWhichNode(mq *kernel.MessageQueue, brokerId int64) {
+       pullFromWhichNodeTable.Store(mq.HashCode(), brokerId)
+}
+
+func recalculatePullFromWhichNode(mq *kernel.MessageQueue) int64 {
+       v, exist := pullFromWhichNodeTable.Load(mq.HashCode())
+       if exist {
+               return v.(int64)
+       }
+       return kernel.MasterId
+}
diff --git a/go.mod b/go.mod
index 809e20a..fa910f6 100644
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,6 @@ go 1.12
 require (
        github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // 
indirect
        github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // 
indirect
-       github.com/pkg/errors v0.8.1
        github.com/sirupsen/logrus v1.3.0
        github.com/stretchr/testify v1.3.0
        gopkg.in/alecthomas/kingpin.v2 v2.2.6
diff --git a/go.sum b/go.sum
index 62f4cad..0b0e0c1 100644
--- a/go.sum
+++ b/go.sum
@@ -7,8 +7,6 @@ github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1 
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod 
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
-github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
-github.com/pkg/errors v0.8.1/go.mod 
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/sirupsen/logrus v1.3.0 
h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
diff --git a/common/manager.go b/kernel/client.go
similarity index 71%
rename from common/manager.go
rename to kernel/client.go
index 83e2934..b8b6eee 100644
--- a/common/manager.go
+++ b/kernel/client.go
@@ -15,12 +15,16 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
-package common
+package kernel
 
 import (
        "context"
-       "fmt"
+       "errors"
        "github.com/apache/rocketmq-client-go/remote"
+       "github.com/apache/rocketmq-client-go/utils"
+       "os"
+       "strconv"
+       "sync"
        "time"
 )
 
@@ -29,6 +33,23 @@ const (
        tranceOff            = "false"
 )
 
+var (
+       log                           = utils.RLog
+       namesrvAddrs                  = os.Getenv("rocketmq.namesrv.addr")
+       clientIP                      = utils.LocalIP()
+       instanceName                  = os.Getenv("rocketmq.client.name")
+       pollNameServerInterval        = 30 * time.Second
+       heartbeatBrokerInterval       = 30 * time.Second
+       persistConsumerOffsetInterval = 5 * time.Second
+       unitMode                      = false
+       vipChannelEnabled, _          = 
strconv.ParseBool(os.Getenv("com.rocketmq.sendMessageWithVIPChannel"))
+       clientID                      = clientIP + "@" + instanceName
+)
+
+var (
+       ErrServiceState = errors.New("service state is not Running, please 
check")
+)
+
 type InnerProducer interface {
        PublishTopicList() []string
        UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
@@ -51,8 +72,9 @@ type InnerConsumer interface {
 func SendMessageSync(ctx context.Context, brokerAddrs, brokerName string, 
request *SendMessageRequest,
        msgs []*Message) (*SendResult, error) {
        cmd := remote.NewRemotingCommand(SendBatchMessage, request, 
encodeMessages(msgs))
-       response, err := client.InvokeSync(brokerAddrs, cmd, 3*time.Second)
+       response, err := remote.InvokeSync(brokerAddrs, cmd, 3 * time.Second)
        if err != nil {
+               log.Warningf("send messages with sync error: %v", err)
                return nil, err
        }
 
@@ -68,14 +90,13 @@ func SendMessageAsync(ctx context.Context, brokerAddrs, 
brokerName string, reque
 func SendMessageOneWay(ctx context.Context, brokerAddrs string, request 
*SendMessageRequest,
        msgs []*Message) (*SendResult, error) {
        cmd := remote.NewRemotingCommand(SendBatchMessage, request, 
encodeMessages(msgs))
-       err := client.InvokeOneWay(brokerAddrs, cmd)
+       err := remote.InvokeOneWay(brokerAddrs, cmd)
+       if err != nil {
+               log.Warningf("send messages with oneway error: %v", err)
+       }
        return nil, err
 }
 
-func encodeMessages(message []*Message) []byte {
-       return nil
-}
-
 func processSendResponse(brokerName string, msgs []*Message, cmd 
*remote.RemotingCommand) *SendResult {
        var status SendStatus
        switch cmd.Code {
@@ -96,8 +117,7 @@ func processSendResponse(brokerName string, msgs []*Message, 
cmd *remote.Remotin
 
        msgIDs := make([]string, 0)
        for i := 0; i < len(msgs); i++ {
-               msgIDs = append(msgIDs, 
msgs[i].Properties[UniqueClientMessageIdKeyindex])
-
+               msgIDs = append(msgIDs, 
msgs[i].Properties[UniqueClientMessageIdKeyIndex])
        }
 
        regionId := cmd.ExtFields[MsgRegion]
@@ -153,77 +173,18 @@ func UpdateConsumerOffset(consumerGroup, topic string, 
queue int, offset int64)
        return nil
 }
 
-//SendStatus message send result
-type SendStatus int
-
-const (
-       SendOK SendStatus = iota
-       SendFlushDiskTimeout
-       SendFlushSlaveTimeout
-       SendSlaveNotAvailable
-)
-
-// SendResult rocketmq send result
-type SendResult struct {
-       Status        SendStatus
-       MsgIDs        []string
-       MessageQueue  *MessageQueue
-       QueueOffset   int64
-       TransactionID string
-       OffsetMsgID   string
-       RegionID      string
-       TraceOn       bool
-}
-
-// SendResult send message result to string(detail result)
-func (result *SendResult) String() string {
-       return fmt.Sprintf("SendResult [sendStatus=%d, msgIds=%s, 
offsetMsgId=%s, queueOffset=%d, messageQueue=%s]",
-               result.Status, result.MsgIDs, result.OffsetMsgID, 
result.QueueOffset, result.MessageQueue.String())
-}
-
-// PullResult the pull result
-type PullResult struct {
-       NextBeginOffset int64
-       MinOffset       int64
-       MaxOffset       int64
-       Status          PullStatus
-       Messages        []*MessageExt
-}
-
-// PullStatus pull status
-type PullStatus int
+var (
+       // group -> InnerProducer
+       producerMap sync.Map
 
-// predefined pull status
-const (
-       PullFound PullStatus = iota
-       PullNoNewMsg
-       PullNoMatchedMsg
-       PullOffsetIllegal
-       PullBrokerTimeout
+       // group -> InnerConsumer
+       consumerMap sync.Map
 )
 
-// MessageQueue message queue
-type MessageQueue struct {
-       Topic      string `json:"topic"`
-       BrokerName string `json:"brokerName"`
-       QueueId    int    `json:"queueId"`
-}
-
-func (mq *MessageQueue) String() string {
-       return fmt.Sprintf("MessageQueue [topic=%s, brokerName=%s, 
queueId=%d]", mq.Topic, mq.BrokerName, mq.QueueId)
-}
-
 func CheckClientInBroker() {
 
 }
 
-func SendHeartbeatToAllBrokerWithLock() {
-
-}
-
-func UpdateTopicRouteInfoFromNameServer(topic string) {
-}
-
 func RegisterConsumer(group string, consumer InnerConsumer) {
 
 }
@@ -248,16 +209,10 @@ func SelectConsumer(group string) InnerConsumer {
        return nil
 }
 
-func FindBrokerAddressInPublish(brokerName string) {
-
-}
-
-func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, 
onlyThisBroker bool) *FindBrokerResult {
+func encodeMessages(message []*Message) []byte {
        return nil
 }
 
-type FindBrokerResult struct {
-       brokerAddr    string
-       slave         bool
-       brokerVersion int32
+func sendHeartbeatToAllBroker() {
+
 }
diff --git a/common/message.go b/kernel/message.go
similarity index 94%
rename from common/message.go
rename to kernel/message.go
index 08ead01..d65c48f 100644
--- a/common/message.go
+++ b/kernel/message.go
@@ -15,7 +15,7 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
-package common
+package kernel
 
 import "fmt"
 
@@ -40,7 +40,7 @@ const (
        ReconsumeTime                  = "RECONSUME_TIME"
        MsgRegion                      = "MSG_REGION"
        TraceSwitch                    = "TRACE_ON"
-       UniqueClientMessageIdKeyindex  = "UNIQ_KEY"
+       UniqueClientMessageIdKeyIndex  = "UNIQ_KEY"
        MaxReconsumeTimes              = "MAX_RECONSUME_TIMES"
        ConsumeStartTime               = "CONSUME_START_TIME"
        TranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET"
@@ -69,9 +69,10 @@ func (msg *Message) String() string {
                msg.Topic, string(msg.Body), msg.Flag, msg.Properties, 
msg.TransactionId)
 }
 
-func (msg *Message) SetTags(tags string) {
-       msg.Properties[tags] = tags
-}
+//
+//func (msg *Message) SetTags(tags string) {
+//     msg.Properties[tags] = tags
+//}
 
 func (msg *Message) PutProperty(key, value string) {
        msg.Properties[key] = value
@@ -104,6 +105,10 @@ type MessageExt struct {
        PreparedTransactionOffset int64
 }
 
+func (msgExt *MessageExt) GetTags() string {
+       return msgExt.Properties[Tags]
+}
+
 func (msgExt *MessageExt) String() string {
        return fmt.Sprint("[Message=%s, MsgId=%s, QueueId=%d, StoreSize=%d, 
QueueOffset=%d, SysFlag=%d, "+
                "BornTimestamp=%d, BornHost=%s, StoreTimestamp=%d, 
StoreHost=%s, CommitLogOffset=%d, BodyCRC=%d, "+
diff --git a/kernel/model.go b/kernel/model.go
new file mode 100644
index 0000000..9ee30cb
--- /dev/null
+++ b/kernel/model.go
@@ -0,0 +1,241 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+import (
+       "fmt"
+       "github.com/apache/rocketmq-client-go/utils"
+)
+
+// SendStatus of message
+type SendStatus int
+
+const (
+       SendOK SendStatus = iota
+       SendFlushDiskTimeout
+       SendFlushSlaveTimeout
+       SendSlaveNotAvailable
+)
+
+// SendResult RocketMQ send result
+type SendResult struct {
+       Status        SendStatus
+       MsgIDs        []string
+       MessageQueue  *MessageQueue
+       QueueOffset   int64
+       TransactionID string
+       OffsetMsgID   string
+       RegionID      string
+       TraceOn       bool
+}
+
+// SendResult send message result to string(detail result)
+func (result *SendResult) String() string {
+       return fmt.Sprintf("SendResult [sendStatus=%d, msgIds=%s, 
offsetMsgId=%s, queueOffset=%d, messageQueue=%s]",
+               result.Status, result.MsgIDs, result.OffsetMsgID, 
result.QueueOffset, result.MessageQueue.String())
+}
+
+// PullStatus pull status
+type PullStatus int
+
+// predefined pull status
+const (
+       PullFound PullStatus = iota
+       PullNoNewMsg
+       PullNoMatchedMsg
+       PullOffsetIllegal
+       PullBrokerTimeout
+)
+
+// PullResult the pull result
+type PullResult struct {
+       NextBeginOffset      int64
+       MinOffset            int64
+       MaxOffset            int64
+       Status               PullStatus
+       SuggestWhichBrokerId int64
+       messageBinary        []byte
+       messageExts          []*MessageExt
+}
+
+func (result *PullResult) GetMessageExts() []*MessageExt {
+       if result.messageExts != nil && len(result.messageExts) > 0 {
+               return result.messageExts
+       }
+
+       return result.messageExts
+}
+
+func (result *PullResult) SetMessageExts(msgExts []*MessageExt) {
+       result.messageBinary = nil
+       result.messageExts = msgExts
+}
+
+func (result *PullResult) GetMessages() []*Message {
+       if result.messageExts == nil || len(result.messageExts) == 0 {
+               return make([]*Message, 0)
+       }
+       return toMessages(result.messageExts)
+}
+
+func toMessages(messageExts []*MessageExt) []*Message {
+       msgs := make([]*Message, 0)
+
+       return msgs
+}
+
+// MessageQueue message queue
+type MessageQueue struct {
+       Topic      string `json:"topic"`
+       BrokerName string `json:"brokerName"`
+       QueueId    int    `json:"queueId"`
+}
+
+func (mq *MessageQueue) String() string {
+       return fmt.Sprintf("MessageQueue [topic=%s, brokerName=%s, 
queueId=%d]", mq.Topic, mq.BrokerName, mq.QueueId)
+}
+
+func (mq *MessageQueue) HashCode() int {
+       result := 1
+       result = 31*result + utils.HashString(mq.BrokerName)
+       result = 31*result + mq.QueueId
+       result = 31*result + utils.HashString(mq.Topic)
+
+       return result
+}
+
+type FindBrokerResult struct {
+       BrokerAddr    string
+       Slave         bool
+       BrokerVersion int
+}
+
+type (
+       // groupName of producer
+       producerData string
+
+       consumeType string
+
+       MessageModel     int
+       ConsumeFromWhere int
+       ServiceState int
+)
+
+const (
+       ConsumeActively  = consumeType("PULL")
+       ConsumePassively = consumeType("PUSH")
+
+       BroadCasting = MessageModel(1)
+       Clustering   = MessageModel(2)
+
+       ConsumeFromLastOffset ConsumeFromWhere = iota
+       ConsumeFromFirstOffset
+       ConsumeFromTimestamp
+
+       CreateJust ServiceState = iota
+       Running
+       Shutdown
+)
+
+func (mode MessageModel) String() string {
+       switch mode {
+       case BroadCasting:
+               return "BroadCasting"
+       case Clustering:
+               return "Clustering"
+       default:
+               return "Unknown"
+       }
+}
+
+type ExpressionType string
+
+const (
+       /**
+        * <ul>
+        * Keywords:
+        * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
+        * </ul>
+        * <p/>
+        * <ul>
+        * Data type:
+        * <li>Boolean, like: TRUE, FALSE</li>
+        * <li>String, like: 'abc'</li>
+        * <li>Decimal, like: 123</li>
+        * <li>Float number, like: 3.1415</li>
+        * </ul>
+        * <p/>
+        * <ul>
+        * Grammar:
+        * <li>{@code AND, OR}</li>
+        * <li>{@code >, >=, <, <=, =}</li>
+        * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
+        * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
+        * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this 
operation only support String type.</li>
+        * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is 
null, or not.</li>
+        * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, 
or false.</li>
+        * </ul>
+        * <p/>
+        * <p>
+        * Example:
+        * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
+        * </p>
+        */
+       SQL92 = ExpressionType("SQL92")
+
+       /**
+        * Only support or operation such as
+        * "tag1 || tag2 || tag3", <br>
+        * If null or * expression, meaning subscribe all.
+        */
+       TAG = ExpressionType("TAG")
+)
+
+func IsTagType(exp string) bool {
+       if exp == "" || exp == "TAG" {
+               return true
+       }
+       return false
+}
+
+var SubAll = "*"
+
+type SubscriptionData struct {
+       ClassFilterMode bool
+       Topic           string
+       SubString       string
+       Tags            map[string]bool
+       Codes           map[int32]bool
+       SubVersion      int64
+       ExpType         ExpressionType
+}
+
+type consumerData struct {
+       groupName         string
+       cType             consumeType
+       messageModel      MessageModel
+       where             ConsumeFromWhere
+       subscriptionDatas []SubscriptionData
+       unitMode          bool
+}
+
+type heartbeatData struct {
+       clientId      string
+       producerDatas []producerData
+       consumerDatas []consumerData
+}
diff --git a/common/transaction.go b/kernel/mq_version.go
similarity index 67%
copy from common/transaction.go
copy to kernel/mq_version.go
index e526af5..e7d7f32 100644
--- a/common/transaction.go
+++ b/kernel/mq_version.go
@@ -2,20 +2,21 @@
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
 this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
+The ASF licenses this file to You under the Apache License Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License.  You may obtain a copy of the License at
 
     http://www.apache.org/licenses/LICENSE-2.0
 
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+Unless required by applicable law or agreed to in writing software
+distributed under the License is distributed on an "AS IS" BASIS
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package common
+package kernel
 
-type TransactionListener interface {
-}
+const (
+       V4_1_0 = 0
+)
diff --git a/common/perm.go b/kernel/perm.go
similarity index 54%
copy from common/perm.go
copy to kernel/perm.go
index 13f1e7e..e78a876 100644
--- a/common/perm.go
+++ b/kernel/perm.go
@@ -1,21 +1,21 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * contributor license agreements.  See the NOTICE file dqueueIstributed with
+ * thqueueIs work for additional information regarding copyright ownership.
+ * The ASF licenses thqueueIs file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use thqueueIs file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  dqueueIstributed under the License queueIs dqueueIstributed on an "AS IS" 
BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
+ *  See the License for the specific language governing permqueueIssions and
  *  limitations under the License.
  */
 
-package common
+package kernel
 
 const (
        permPriority = 0x1 << 3
@@ -24,15 +24,15 @@ const (
        permInherit  = 0x1 << 0
 )
 
-func isReadable(perm int) bool {
+func queueIsReadable(perm int) bool {
        return (perm & permRead) == permRead
 }
 
-func isWriteable(perm int) bool {
+func queueIsWriteable(perm int) bool {
        return (perm & permWrite) == permWrite
 }
 
-func isInherited(perm int) bool {
+func queueIsInherited(perm int) bool {
        return (perm & permInherit) == permInherit
 }
 
@@ -42,15 +42,15 @@ func perm2string(perm int) string {
                bytes[i] = '-'
        }
 
-       if isReadable(perm) {
+       if queueIsReadable(perm) {
                bytes[0] = 'R'
        }
 
-       if isWriteable(perm) {
+       if queueIsWriteable(perm) {
                bytes[1] = 'W'
        }
 
-       if isInherited(perm) {
+       if queueIsInherited(perm) {
                bytes[2] = 'X'
        }
 
diff --git a/common/request.go b/kernel/request.go
similarity index 96%
rename from common/request.go
rename to kernel/request.go
index 83d74a0..7967cca 100644
--- a/common/request.go
+++ b/kernel/request.go
@@ -15,7 +15,7 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
-package common
+package kernel
 
 const (
        GetRouteInfoByTopic = int16(105)
@@ -54,8 +54,9 @@ type PullMessageRequest struct {
        SysFlag              int32  `json:"sysFlag"`
        CommitOffset         int64  `json:"commitOffset"`
        SuspendTimeoutMillis int64  `json:"suspendTimeoutMillis"`
-       Subscription         string `json:"subscription"`
+       SubExpression        string `json:"subscription"`
        SubVersion           int64  `json:"subVersion"`
+       ExpressionType       string `json:"expressionType"`
 }
 
 type GetMaxOffsetRequest struct {
diff --git a/common/response.go b/kernel/response.go
similarity index 98%
rename from common/response.go
rename to kernel/response.go
index 0fc0101..df1cecc 100644
--- a/common/response.go
+++ b/kernel/response.go
@@ -15,7 +15,7 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
-package common
+package kernel
 
 const (
        Success           = int16(0)
diff --git a/common/route.go b/kernel/route.go
similarity index 78%
rename from common/route.go
rename to kernel/route.go
index 4ad3218..38deb93 100644
--- a/common/route.go
+++ b/kernel/route.go
@@ -15,13 +15,12 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
-package common
+package kernel
 
 import (
        "encoding/json"
+       "errors"
        "github.com/apache/rocketmq-client-go/remote"
-       "github.com/pkg/errors"
-       log "github.com/sirupsen/logrus"
        "sort"
        "strconv"
        "strings"
@@ -34,7 +33,7 @@ const (
        requestTimeout   = 3000
        defaultTopic     = "TBW102"
        defaultQueueNums = 4
-       masterId         = int64(0)
+       MasterId         = int64(0)
 )
 
 var (
@@ -42,10 +41,15 @@ var (
 )
 
 var (
+       // brokerName -> *BrokerData
        brokerAddressesMap sync.Map
-       publishInfoMap     sync.Map
-       routeDataMap       sync.Map
-       lockNamesrv        sync.Mutex
+
+       // brokerName -> map[string]int32
+       brokerVersionMap sync.Map
+
+       publishInfoMap sync.Map
+       routeDataMap   sync.Map
+       lockNamesrv    sync.Mutex
 )
 
 // key is topic, value is TopicPublishInfo
@@ -70,37 +74,7 @@ func (info *TopicPublishInfo) fetchQueueIndex() int {
        return int(qIndex) % length
 }
 
-func tryToFindTopicPublishInfo(topic string) *TopicPublishInfo {
-       value, exist := publishInfoMap.Load(topic)
-
-       var info *TopicPublishInfo
-       if exist {
-               info = value.(*TopicPublishInfo)
-       }
-
-       if info == nil || !info.isOK() {
-               updateTopicRouteInfo(topic)
-               value, exist = publishInfoMap.Load(topic)
-               if !exist {
-                       info = &TopicPublishInfo{HaveTopicRouterInfo: false}
-               } else {
-                       info = value.(*TopicPublishInfo)
-               }
-       }
-
-       if info.HaveTopicRouterInfo || info.isOK() {
-               return info
-       }
-
-       value, exist = publishInfoMap.Load(topic)
-       if exist {
-               return value.(*TopicPublishInfo)
-       }
-
-       return nil
-}
-
-func updateTopicRouteInfo(topic string) {
+func UpdateTopicRouteInfo(topic string) {
        // Todo process lock timeout
        lockNamesrv.Lock()
        defer lockNamesrv.Unlock()
@@ -151,13 +125,91 @@ func updateTopicRouteInfo(topic string) {
        }
 }
 
+func FindBrokerAddressInPublish(brokerName string) string {
+       bd, exist := brokerAddressesMap.Load(brokerName)
+
+       if !exist {
+               return ""
+       }
+
+       return bd.(*BrokerData).brokerAddresses[MasterId]
+}
+
+func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, 
onlyThisBroker bool) *FindBrokerResult {
+       var (
+               brokerAddr = ""
+               slave      = false
+               found      = false
+       )
+
+       bd, exist := brokerAddressesMap.Load(brokerName)
+
+       if exist {
+               for k, v := range bd.(*BrokerData).brokerAddresses {
+                       if v != "" {
+                               found = true
+                               if k != MasterId {
+                                       slave = true
+                               }
+                               break
+                       }
+               }
+       }
+
+       var result *FindBrokerResult
+       if found {
+               result = &FindBrokerResult{
+                       BrokerAddr:    brokerName,
+                       Slave:         slave,
+                       BrokerVersion: findBrokerVersion(brokerName, 
brokerAddr),
+               }
+       }
+
+       return result
+}
+
+func FetchSubscribeMessageQueues(topic string) ([]*MessageQueue, error) {
+       routeData, err := queryTopicRouteInfoFromServer(topic, 3*time.Second)
+
+       if err != nil {
+               return nil, err
+       }
+
+       mqs := make([]*MessageQueue, 0)
+
+       for _, qd := range routeData.queueDataList {
+               if queueIsReadable(qd.perm) {
+                       for i := 0; i < qd.readQueueNums; i++ {
+                               mqs = append(mqs, &MessageQueue{Topic: topic, 
BrokerName: qd.brokerName, QueueId: i})
+                       }
+               }
+       }
+
+       return mqs, nil
+}
+
+func findBrokerVersion(brokerName, brokerAddr string) int {
+       versions, exist := brokerVersionMap.Load(brokerName)
+
+       if !exist {
+               return 0
+       }
+
+       v, exist := versions.(map[string]int)[brokerAddr]
+
+       if exist {
+               return v
+       }
+       return 0
+}
+
 func queryTopicRouteInfoFromServer(topic string, timeout time.Duration) 
(*topicRouteData, error) {
        request := &GetRouteInfoRequest{
                Topic: topic,
        }
        rc := remote.NewRemotingCommand(GetRouteInfoByTopic, request, nil)
 
-       response, err := client.InvokeSync(getNameServerAddress(), rc, timeout)
+       response, err := remote.InvokeSync(getNameServerAddress(), rc, timeout)
 
        if err != nil {
                return nil, err
@@ -242,7 +294,7 @@ func RouteData2PublishInfo(topic string, data 
*topicRouteData) *TopicPublishInfo
        })
 
        for _, qd := range qds {
-               if !isWriteable(qd.perm) {
+               if !queueIsWriteable(qd.perm) {
                        continue
                }
 
@@ -254,7 +306,7 @@ func RouteData2PublishInfo(topic string, data 
*topicRouteData) *TopicPublishInfo
                        }
                }
 
-               if bData == nil || bData.brokerAddresses[masterId] == "" {
+               if bData == nil || bData.brokerAddresses[MasterId] == "" {
                        continue
                }
 
diff --git a/common/transaction.go b/kernel/transaction.go
similarity index 98%
copy from common/transaction.go
copy to kernel/transaction.go
index e526af5..4895875 100644
--- a/common/transaction.go
+++ b/kernel/transaction.go
@@ -15,7 +15,7 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
-package common
+package kernel
 
 type TransactionListener interface {
 }
diff --git a/remote/client.go b/remote/client.go
index 7b1c528..94f5ed2 100644
--- a/remote/client.go
+++ b/remote/client.go
@@ -93,6 +93,18 @@ type RemotingClient interface {
        InvokeOneWay(string, *RemotingCommand) error
 }
 
+func InvokeSync(addr string, request *RemotingCommand, timeout time.Duration) 
(*RemotingCommand, error) {
+       return nil, nil
+}
+
+func InvokeAsync(addr string, request *RemotingCommand, timeout time.Duration, 
f func(*RemotingCommand)) error {
+       return nil
+}
+
+func InvokeOneWay(addr string, request*RemotingCommand) error {
+       return nil
+}
+
 //defaultRemotingClient for default RemotingClient implementation
 type defaultRemotingClient struct {
        responseTable    map[int32]*ResponseFuture
diff --git a/common/transaction.go b/utils/helper.go
similarity index 77%
rename from common/transaction.go
rename to utils/helper.go
index e526af5..fe3dd62 100644
--- a/common/transaction.go
+++ b/utils/helper.go
@@ -15,7 +15,18 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
-package common
+package utils
 
-type TransactionListener interface {
+import "hash/crc32"
+
+func LocalIP() string {
+       return ""
+}
+
+// HashString hashes a string to a unique hashcode.
+func HashString(s string) int {
+       if s == "" {
+               return 0
+       }
+       return int(crc32.ChecksumIEEE([]byte(s)))
 }
diff --git a/common/perm.go b/utils/log.go
similarity index 56%
rename from common/perm.go
rename to utils/log.go
index 13f1e7e..d684efd 100644
--- a/common/perm.go
+++ b/utils/log.go
@@ -15,44 +15,28 @@
  *  limitations under the License.
  */
 
-package common
-
-const (
-       permPriority = 0x1 << 3
-       permRead     = 0x1 << 2
-       permWrite    = 0x1 << 1
-       permInherit  = 0x1 << 0
-)
-
-func isReadable(perm int) bool {
-       return (perm & permRead) == permRead
-}
-
-func isWriteable(perm int) bool {
-       return (perm & permWrite) == permWrite
-}
-
-func isInherited(perm int) bool {
-       return (perm & permInherit) == permInherit
-}
-
-func perm2string(perm int) string {
-       bytes := make([]byte, 3)
-       for i := 0; i < 3; i++ {
-               bytes[i] = '-'
-       }
-
-       if isReadable(perm) {
-               bytes[0] = 'R'
-       }
-
-       if isWriteable(perm) {
-               bytes[1] = 'W'
-       }
-
-       if isInherited(perm) {
-               bytes[2] = 'X'
-       }
-
-       return string(bytes)
+package utils
+
+import "io"
+
+var RLog Logger
+
+type Logger interface {
+       Output() io.Writer
+       SetOutput(w io.Writer)
+       Prefix() string
+       SetPrefix(p string)
+       SetHeader(h string)
+       Print(i ...interface{})
+       Printf(format string, args ...interface{})
+       Debug(i ...interface{})
+       Debugf(format string, args ...interface{})
+       Info(i ...interface{})
+       Infof(format string, args ...interface{})
+       Warning(i ...interface{})
+       Warningf(format string, args ...interface{})
+       Error(i ...interface{})
+       Errorf(format string, args ...interface{})
+       Fatal(i ...interface{})
+       Fatalf(format string, args ...interface{})
 }

Reply via email to