This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 3011efa  Refactor directories & API (#82)
3011efa is described below

commit 3011efa4508e453f14e0ef83647aa0e429e1cb35
Author: wenfeng <[email protected]>
AuthorDate: Tue Jul 2 23:40:15 2019 +0800

    Refactor directories & API (#82)
    
    * refactor directorie & apiss
    
    * fix ci failed
---
 api.go                                             |  54 ++++
 consumer/strategy.go                               | 130 ---------
 docs/Introduction.md                               |  12 +-
 examples/consumer/main.go                          |  16 +-
 examples/producer/main.go                          |   8 +-
 {consumer => internal/consumer}/consumer.go        | 301 ++++-----------------
 {consumer => internal/consumer}/consumer_test.go   |   0
 {consumer => internal/consumer}/offset_store.go    |  33 +--
 {consumer => internal/consumer}/process_queue.go   |  24 +-
 {consumer => internal/consumer}/pull_consumer.go   |  50 ++--
 .../consumer}/pull_consumer_test.go                |   0
 {consumer => internal/consumer}/push_consumer.go   | 103 +++----
 .../consumer}/push_consumer_test.go                |   0
 {consumer => internal/consumer}/statistics.go      |   0
 {kernel => internal/kernel}/client.go              |  83 +++---
 {kernel => internal/kernel}/client_test.go         |   0
 {kernel => internal/kernel}/constants.go           |   0
 internal/kernel/model.go                           |  80 ++++++
 {kernel => internal/kernel}/mq_version.go          |   0
 {kernel => internal/kernel}/perm.go                |   0
 {kernel => internal/kernel}/request.go             |   0
 {kernel => internal/kernel}/response.go            |   0
 {kernel => internal/kernel}/route.go               |  15 +-
 {kernel => internal/kernel}/route_test.go          |   0
 {kernel => internal/kernel}/transaction.go         |   0
 {kernel => internal/kernel}/validators.go          |   0
 {producer => internal/producer}/producer.go        |  29 +-
 {remote => internal/remote}/codec.go               |   0
 {remote => internal/remote}/codec_test.go          |   0
 {remote => internal/remote}/remote_client.go       |   0
 {remote => internal/remote}/remote_client_test.go  |   0
 {remote => internal/remote}/rpchook.go             |   0
 primitive/consume.go                               | 128 +++++++++
 {kernel => primitive}/message.go                   |  32 ++-
 primitive/options.go                               | 132 +++++++++
 kernel/model.go => primitive/result.go             |  86 +-----
 primitive/strategy.go                              | 117 ++++++++
 37 files changed, 760 insertions(+), 673 deletions(-)

diff --git a/api.go b/api.go
new file mode 100644
index 0000000..5043f95
--- /dev/null
+++ b/api.go
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package rocketmq
+
+import (
+       "context"
+       "github.com/apache/rocketmq-client-go/primitive"
+)
+
+type Producer interface {
+       Start() error
+       Shutdown() error
+       SendSync(context.Context, ...*primitive.Message) (primitive.SendResult, 
error)
+       SendAsync(context.Context, func(primitive.SendResult), 
...*primitive.Message) error
+       SendOneWay(context.Context, ...*primitive.Message) error
+}
+
+func NewProducer(opt primitive.ProducerOptions) (Producer, error) {
+       return nil, nil
+}
+
+type PushConsumer interface {
+       Start() error
+       Shutdown() error
+       Subscribe(topic string, selector primitive.MessageSelector,
+               f func(context.Context, ...*primitive.MessageExt) 
(primitive.ConsumeResult, error)) error
+       Unsubscribe(string) error
+}
+
+type PullConsumer interface {
+       Start() error
+       Shutdown() error
+       Pull(context.Context, string, primitive.MessageSelector, int) 
(primitive.PullResult, error)
+       PullFrom(context.Context, primitive.MessageQueue, int64, int) 
(primitive.PullResult, error)
+       // only update in memory
+       UpdateOffset(primitive.MessageQueue, int64) error
+       PersistOffset(context.Context) error
+       CurrentOffset(primitive.MessageQueue) int64
+}
diff --git a/consumer/strategy.go b/consumer/strategy.go
deleted file mode 100644
index 1cb6b2b..0000000
--- a/consumer/strategy.go
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package consumer
-
-import (
-       "github.com/apache/rocketmq-client-go/kernel"
-       "github.com/apache/rocketmq-client-go/rlog"
-       "github.com/apache/rocketmq-client-go/utils"
-)
-
-// Strategy Algorithm for message allocating between consumers
-type AllocateStrategy string
-
-const (
-       // An allocate strategy proxy for based on machine room nearside 
priority. An actual allocate strategy can be
-       // specified.
-       //
-       // If any consumer is alive in a machine room, the message queue of the 
broker which is deployed in the same machine
-       // should only be allocated to those. Otherwise, those message queues 
can be shared along all consumers since there are
-       // no alive consumer to monopolize them.
-       StrategyMachineNearby = AllocateStrategy("MachineNearby")
-
-       // Average Hashing queue algorithm
-       StrategyAveragely = AllocateStrategy("Averagely")
-
-       // Cycle average Hashing queue algorithm
-       StrategyAveragelyCircle = AllocateStrategy("AveragelyCircle")
-
-       // Use Message Queue specified
-       StrategyConfig = AllocateStrategy("Config")
-
-       // Computer room Hashing queue algorithm, such as Alipay logic room
-       StrategyMachineRoom = AllocateStrategy("MachineRoom")
-
-       // Consistent Hashing queue algorithm
-       StrategyConsistentHash = AllocateStrategy("ConsistentHash")
-)
-
-func allocateByAveragely(consumerGroup, currentCID string, mqAll 
[]*kernel.MessageQueue,
-       cidAll []string) []*kernel.MessageQueue {
-       if currentCID == "" || utils.IsArrayEmpty(mqAll) || 
utils.IsArrayEmpty(cidAll) {
-               return nil
-       }
-       var (
-               find  bool
-               index int
-       )
-
-       for idx := range cidAll {
-               if cidAll[idx] == currentCID {
-                       find = true
-                       index = idx
-                       break
-               }
-       }
-       if !find {
-               rlog.Infof("[BUG] ConsumerGroup=%s, ConsumerId=%s not in 
cidAll:%+v", consumerGroup, currentCID, cidAll)
-               return nil
-       }
-
-       mqSize := len(mqAll)
-       cidSize := len(cidAll)
-       mod := mqSize % cidSize
-
-       var averageSize int
-       if mqSize <= cidSize {
-               averageSize = 1
-       } else {
-               if mod > 0 && index < mod {
-                       averageSize = mqSize/cidSize + 1
-               } else {
-                       averageSize = mqSize / cidSize
-               }
-       }
-
-       var startIndex int
-       if mod > 0 && index < mod {
-               startIndex = index * averageSize
-       } else {
-               startIndex = index*averageSize + mod
-       }
-
-       num := utils.MinInt(averageSize, mqSize-startIndex)
-       result := make([]*kernel.MessageQueue, num)
-       for i := 0; i < num; i++ {
-               result[i] = mqAll[(startIndex+i)%mqSize]
-       }
-       return result
-}
-
-// TODO
-func allocateByMachineNearby(consumerGroup, currentCID string, mqAll 
[]*kernel.MessageQueue,
-       cidAll []string) []*kernel.MessageQueue {
-       return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
-}
-
-func allocateByAveragelyCircle(consumerGroup, currentCID string, mqAll 
[]*kernel.MessageQueue,
-       cidAll []string) []*kernel.MessageQueue {
-       return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
-}
-
-func allocateByConfig(consumerGroup, currentCID string, mqAll 
[]*kernel.MessageQueue,
-       cidAll []string) []*kernel.MessageQueue {
-       return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
-}
-
-func allocateByMachineRoom(consumerGroup, currentCID string, mqAll 
[]*kernel.MessageQueue,
-       cidAll []string) []*kernel.MessageQueue {
-       return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
-}
-
-func allocateByConsistentHash(consumerGroup, currentCID string, mqAll 
[]*kernel.MessageQueue,
-       cidAll []string) []*kernel.MessageQueue {
-       return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
-}
diff --git a/docs/Introduction.md b/docs/Introduction.md
index 9bc22c7..0db6e7b 100644
--- a/docs/Introduction.md
+++ b/docs/Introduction.md
@@ -20,8 +20,8 @@ rlog.SetLogger(Logger)
 Producer interface {
        Start() error
        Shutdown() error
-       SendSync(context.Context, *kernel.Message) (*kernel.SendResult, error)
-       SendOneWay(context.Context, *kernel.Message) error
+       SendSync(context.Context, *primitive.Message) (*kernel.SendResult, 
error)
+       SendOneWay(context.Context, *primitive.Message) error
 }
 ```
 
@@ -42,7 +42,7 @@ err := p.Start()
 
 - send message with sync
 ```go
-result, err := p.SendSync(context.Background(), &kernel.Message{
+result, err := p.SendSync(context.Background(), &primitive.Message{
     Topic: "test",
     Body:  []byte("Hello RocketMQ Go Client!"),
 })
@@ -52,7 +52,7 @@ result, err := p.SendSync(context.Background(), 
&kernel.Message{
 
 - or send message with oneway
 ```go 
-err := p.SendOneWay(context.Background(), &kernel.Message{
+err := p.SendOneWay(context.Background(), &primitive.Message{
     Topic: "test",
     Body:  []byte("Hello RocketMQ Go Client!"),
 })
@@ -68,7 +68,7 @@ PushConsumer interface {
        Start() error
        Shutdown()
        Subscribe(topic string, selector MessageSelector,
-               f func(*ConsumeMessageContext, []*kernel.MessageExt) 
(ConsumeResult, error)) error
+               f func(*ConsumeMessageContext, []*primitive.MessageExt) 
(ConsumeResult, error)) error
 }
 ```
 
@@ -85,7 +85,7 @@ c := consumer.NewPushConsumer("testGroup", 
consumer.ConsumerOption{
 - Subscribe a topic(only support one topic now), and define your consuming 
function
 ```go
 err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx 
*consumer.ConsumeMessageContext,
-    msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
+    msgs []*primitive.MessageExt) (consumer.ConsumeResult, error) {
     fmt.Println(msgs)
     return consumer.ConsumeSuccess, nil
 })
diff --git a/examples/consumer/main.go b/examples/consumer/main.go
index f660d43..65b0ba3 100644
--- a/examples/consumer/main.go
+++ b/examples/consumer/main.go
@@ -19,22 +19,22 @@ package main
 
 import (
        "fmt"
-       "github.com/apache/rocketmq-client-go/consumer"
-       "github.com/apache/rocketmq-client-go/kernel"
+       "github.com/apache/rocketmq-client-go/internal/consumer"
+       "github.com/apache/rocketmq-client-go/primitive"
        "os"
        "time"
 )
 
 func main() {
-       c, _ := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
+       c, _ := consumer.NewPushConsumer("testGroup", primitive.ConsumerOption{
                NameServerAddr: "127.0.0.1:9876",
-               ConsumerModel:  consumer.Clustering,
-               FromWhere:      consumer.ConsumeFromFirstOffset,
+               ConsumerModel:  primitive.Clustering,
+               FromWhere:      primitive.ConsumeFromFirstOffset,
        })
-       err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx 
*consumer.ConsumeMessageContext,
-               msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
+       err := c.Subscribe("test", primitive.MessageSelector{}, func(ctx 
*consumer.ConsumeMessageContext,
+               msgs []*primitive.MessageExt) (primitive.ConsumeResult, error) {
                fmt.Println(msgs)
-               return consumer.ConsumeSuccess, nil
+               return primitive.ConsumeSuccess, nil
        })
        if err != nil {
                fmt.Println(err.Error())
diff --git a/examples/producer/main.go b/examples/producer/main.go
index 9cc626b..c3e338d 100644
--- a/examples/producer/main.go
+++ b/examples/producer/main.go
@@ -20,13 +20,13 @@ package main
 import (
        "context"
        "fmt"
-       "github.com/apache/rocketmq-client-go/kernel"
-       "github.com/apache/rocketmq-client-go/producer"
+       "github.com/apache/rocketmq-client-go/internal/producer"
+       "github.com/apache/rocketmq-client-go/primitive"
        "os"
 )
 
 func main() {
-       opt := producer.ProducerOptions{
+       opt := primitive.ProducerOptions{
                NameServerAddr:           "127.0.0.1:9876",
                RetryTimesWhenSendFailed: 2,
        }
@@ -37,7 +37,7 @@ func main() {
                os.Exit(1)
        }
        for i := 0; i < 1000; i++ {
-               res, err := p.SendSync(context.Background(), &kernel.Message{
+               res, err := p.SendSync(context.Background(), &primitive.Message{
                        Topic: "test",
                        Body:  []byte("Hello RocketMQ Go Client!"),
                })
diff --git a/consumer/consumer.go b/internal/consumer/consumer.go
similarity index 66%
rename from consumer/consumer.go
rename to internal/consumer/consumer.go
index 80e4fb5..08ae664 100644
--- a/consumer/consumer.go
+++ b/internal/consumer/consumer.go
@@ -20,8 +20,9 @@ package consumer
 import (
        "encoding/json"
        "fmt"
-       "github.com/apache/rocketmq-client-go/kernel"
-       "github.com/apache/rocketmq-client-go/remote"
+       "github.com/apache/rocketmq-client-go/internal/kernel"
+       "github.com/apache/rocketmq-client-go/internal/remote"
+       "github.com/apache/rocketmq-client-go/primitive"
        "github.com/apache/rocketmq-client-go/rlog"
        "github.com/apache/rocketmq-client-go/utils"
        "github.com/tidwall/gjson"
@@ -53,135 +54,18 @@ const (
        _PersistConsumerOffsetInterval = 5 * time.Second
 )
 
-// Message model defines the way how messages are delivered to each consumer 
clients.
-// </p>
-//
-// RocketMQ supports two message models: clustering and broadcasting. If 
clustering is set, consumer clients with
-// the same {@link #consumerGroup} would only consume shards of the messages 
subscribed, which achieves load
-// balances; Conversely, if the broadcasting is set, each consumer client will 
consume all subscribed messages
-// separately.
-// </p>
-//
-// This field defaults to clustering.
-type MessageModel int
-
-const (
-       BroadCasting MessageModel = iota
-       Clustering
-)
-
-func (mode MessageModel) String() string {
-       switch mode {
-       case BroadCasting:
-               return "BroadCasting"
-       case Clustering:
-               return "Clustering"
-       default:
-               return "Unknown"
-       }
-}
-
-// Consuming point on consumer booting.
-// </p>
-//
-// There are three consuming points:
-// <ul>
-// <li>
-// <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it 
stopped previously.
-// If it were a newly booting up consumer client, according aging of the 
consumer group, there are two
-// cases:
-// <ol>
-// <li>
-// if the consumer group is created so recently that the earliest message 
being subscribed has yet
-// expired, which means the consumer group represents a lately launched 
business, consuming will
-// start from the very beginning;
-// </li>
-// <li>
-// if the earliest message being subscribed has expired, consuming will start 
from the latest
-// messages, meaning messages born prior to the booting timestamp would be 
ignored.
-// </li>
-// </ol>
-// </li>
-// <li>
-// <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from 
earliest messages available.
-// </li>
-// <li>
-// <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from 
specified timestamp, which means
-// messages born prior to {@link #consumeTimestamp} will be ignored
-// </li>
-// </ul>
-type ConsumeFromWhere int
-
-const (
-       ConsumeFromLastOffset ConsumeFromWhere = iota
-       ConsumeFromFirstOffset
-       ConsumeFromTimestamp
-)
-
 type ConsumeType string
 
 const (
        _PullConsume = ConsumeType("pull")
        _PushConsume = ConsumeType("push")
-)
-
-type ExpressionType string
-
-const (
-       /**
-        * <ul>
-        * Keywords:
-        * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
-        * </ul>
-        * <p/>
-        * <ul>
-        * Data type:
-        * <li>Boolean, like: TRUE, FALSE</li>
-        * <li>String, like: 'abc'</li>
-        * <li>Decimal, like: 123</li>
-        * <li>Float number, like: 3.1415</li>
-        * </ul>
-        * <p/>
-        * <ul>
-        * Grammar:
-        * <li>{@code AND, OR}</li>
-        * <li>{@code >, >=, <, <=, =}</li>
-        * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
-        * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
-        * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this 
operation only support String type.</li>
-        * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is 
null, or not.</li>
-        * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, 
or false.</li>
-        * </ul>
-        * <p/>
-        * <p>
-        * Example:
-        * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
-        * </p>
-        */
-       SQL92 = ExpressionType("SQL92")
-
-       /**
-        * Only support or operation such as
-        * "tag1 || tag2 || tag3", <br>
-        * If null or * expression, meaning subscribe all.
-        */
-       TAG = ExpressionType("TAG")
-)
-
-func IsTagType(exp string) bool {
-       if exp == "" || exp == "TAG" {
-               return true
-       }
-       return false
-}
 
-const (
        _SubAll = "*"
 )
 
 type PullRequest struct {
        consumerGroup string
-       mq            *kernel.MessageQueue
+       mq            *primitive.MessageQueue
        pq            *processQueue
        nextOffset    int64
        lockedFirst   bool
@@ -192,83 +76,6 @@ func (pr *PullRequest) String() string {
                pr.consumerGroup, pr.mq.Topic, pr.mq.QueueId)
 }
 
-type ConsumerOption struct {
-       kernel.ClientOption
-       NameServerAddr string
-
-       /**
-        * Backtracking consumption time with second precision. Time format is
-        * 20131223171201<br>
-        * Implying Seventeen twelve and 01 seconds on December 23, 2013 
year<br>
-        * Default backtracking consumption time Half an hour ago.
-        */
-       ConsumeTimestamp string
-
-       // The socket timeout in milliseconds
-       ConsumerPullTimeout time.Duration
-
-       // Concurrently max span offset.it has no effect on sequential 
consumption
-       ConsumeConcurrentlyMaxSpan int
-
-       // Flow control threshold on queue level, each message queue will cache 
at most 1000 messages by default,
-       // Consider the {PullBatchSize}, the instantaneous value may exceed the 
limit
-       PullThresholdForQueue int64
-
-       // Limit the cached message size on queue level, each message queue 
will cache at most 100 MiB messages by default,
-       // Consider the {@code pullBatchSize}, the instantaneous value may 
exceed the limit
-       //
-       // The size of a message only measured by message body, so it's not 
accurate
-       PullThresholdSizeForQueue int
-
-       // Flow control threshold on topic level, default value is -1(Unlimited)
-       //
-       // The value of {@code pullThresholdForQueue} will be overwrote and 
calculated based on
-       // {@code pullThresholdForTopic} if it is't unlimited
-       //
-       // For example, if the value of pullThresholdForTopic is 1000 and 10 
message queues are assigned to this consumer,
-       // then pullThresholdForQueue will be set to 100
-       PullThresholdForTopic int
-
-       // Limit the cached message size on topic level, default value is -1 
MiB(Unlimited)
-       //
-       // The value of {@code pullThresholdSizeForQueue} will be overwrote and 
calculated based on
-       // {@code pullThresholdSizeForTopic} if it is't unlimited
-       //
-       // For example, if the value of pullThresholdSizeForTopic is 1000 MiB 
and 10 message queues are
-       // assigned to this consumer, then pullThresholdSizeForQueue will be 
set to 100 MiB
-       PullThresholdSizeForTopic int
-
-       // Message pull Interval
-       PullInterval time.Duration
-
-       // Batch consumption size
-       ConsumeMessageBatchMaxSize int
-
-       // Batch pull size
-       PullBatchSize int32
-
-       // Whether update subscription relationship when every pull
-       PostSubscriptionWhenPull bool
-
-       // Max re-consume times. -1 means 16 times.
-       //
-       // If messages are re-consumed more than {@link #maxReconsumeTimes} 
before success, it's be directed to a deletion
-       // queue waiting.
-       MaxReconsumeTimes int
-
-       // Suspending pulling time for cases requiring slow pulling like 
flow-control scenario.
-       SuspendCurrentQueueTimeMillis time.Duration
-
-       // Maximum amount of time a message may block the consuming thread.
-       ConsumeTimeout time.Duration
-
-       ConsumerModel  MessageModel
-       Strategy       AllocateStrategy
-       ConsumeOrderly bool
-       FromWhere      ConsumeFromWhere
-       // TODO traceDispatcher
-}
-
 // TODO hook
 type defaultConsumer struct {
        /**
@@ -279,25 +86,25 @@ type defaultConsumer struct {
         * See <a href="http://rocketmq.apache.org/docs/core-concept/";>here</a> 
for further discussion.
         */
        consumerGroup  string
-       model          MessageModel
-       allocate       func(string, string, []*kernel.MessageQueue, []string) 
[]*kernel.MessageQueue
+       model          primitive.MessageModel
+       allocate       func(string, string, []*primitive.MessageQueue, 
[]string) []*primitive.MessageQueue
        unitMode       bool
        consumeOrderly bool
-       fromWhere      ConsumeFromWhere
+       fromWhere      primitive.ConsumeFromWhere
 
        cType     ConsumeType
        client    *kernel.RMQClient
-       mqChanged func(topic string, mqAll, mqDivided []*kernel.MessageQueue)
+       mqChanged func(topic string, mqAll, mqDivided []*primitive.MessageQueue)
        state     kernel.ServiceState
        pause     bool
        once      sync.Once
-       option    ConsumerOption
-       // key: int, hash(*kernel.MessageQueue)
+       option    primitive.ConsumerOption
+       // key: int, hash(*primitive.MessageQueue)
        // value: *processQueue
        processQueueTable sync.Map
 
        // key: topic(string)
-       // value: map[int]*kernel.MessageQueue
+       // value: map[int]*primitive.MessageQueue
        topicSubscribeInfoTable sync.Map
 
        // key: topic
@@ -314,19 +121,19 @@ func (dc *defaultConsumer) persistConsumerOffset() {
                rlog.Errorf("consumer state error: %s", err.Error())
                return
        }
-       mqs := make([]*kernel.MessageQueue, 0)
+       mqs := make([]*primitive.MessageQueue, 0)
        dc.processQueueTable.Range(func(key, value interface{}) bool {
-               mqs = append(mqs, key.(*kernel.MessageQueue))
+               mqs = append(mqs, key.(*primitive.MessageQueue))
                return true
        })
        dc.storage.persist(mqs)
 }
 
-func (dc *defaultConsumer) updateTopicSubscribeInfo(topic string, mqs 
[]*kernel.MessageQueue) {
+func (dc *defaultConsumer) updateTopicSubscribeInfo(topic string, mqs 
[]*primitive.MessageQueue) {
        _, exist := dc.subscriptionDataTable.Load(topic)
        // does subscribe, if true, replace it
        if exist {
-               mqSet := make(map[int]*kernel.MessageQueue, 0)
+               mqSet := make(map[int]*primitive.MessageQueue, 0)
                for idx := range mqs {
                        mq := mqs[idx]
                        mqSet[mq.HashCode()] = mq
@@ -355,23 +162,23 @@ func (dc *defaultConsumer) doBalance() {
                        rlog.Warnf("do balance of group: %s, but topic: %s does 
not exist.", dc.consumerGroup, topic)
                        return true
                }
-               mqs := v.([]*kernel.MessageQueue)
+               mqs := v.([]*primitive.MessageQueue)
                switch dc.model {
-               case BroadCasting:
+               case primitive.BroadCasting:
                        changed := dc.updateProcessQueueTable(topic, mqs)
                        if changed {
                                dc.mqChanged(topic, mqs, mqs)
                                rlog.Infof("messageQueueChanged, Group: %s, 
Topic: %s, MessageQueues: %v",
                                        dc.consumerGroup, topic, mqs)
                        }
-               case Clustering:
+               case primitive.Clustering:
                        cidAll := dc.findConsumerList(topic)
                        if cidAll == nil {
                                rlog.Warnf("do balance for Group: %s, Topic: %s 
get consumer id list failed",
                                        dc.consumerGroup, topic)
                                return true
                        }
-                       mqAll := make([]*kernel.MessageQueue, len(mqs))
+                       mqAll := make([]*primitive.MessageQueue, len(mqs))
                        copy(mqAll, mqs)
                        sort.Strings(cidAll)
                        sort.SliceStable(mqAll, func(i, j int) bool {
@@ -390,9 +197,9 @@ func (dc *defaultConsumer) doBalance() {
                        changed := dc.updateProcessQueueTable(topic, 
allocateResult)
                        if changed {
                                dc.mqChanged(topic, mqAll, allocateResult)
-                               rlog.Infof("do balance result changed, 
allocateMessageQueueStrategyName=%s, group=%s, "+
+                               rlog.Infof("do balance result changed, 
group=%s, "+
                                        "topic=%s, clientId=%s, mqAllSize=%d, 
cidAllSize=%d, rebalanceResultSize=%d, "+
-                                       "rebalanceResultSet=%v", 
string(dc.option.Strategy), dc.consumerGroup, topic, dc.client.ClientID(), 
len(mqAll),
+                                       "rebalanceResultSet=%v", 
dc.consumerGroup, topic, dc.client.ClientID(), len(mqAll),
                                        len(cidAll), len(allocateResult), 
allocateResult)
 
                        }
@@ -418,12 +225,12 @@ func (dc *defaultConsumer) makeSureStateOK() error {
 }
 
 type lockBatchRequestBody struct {
-       ConsumerGroup string                 `json:"consumerGroup"`
-       ClientId      string                 `json:"clientId"`
-       MQs           []*kernel.MessageQueue `json:"mqSet"`
+       ConsumerGroup string                    `json:"consumerGroup"`
+       ClientId      string                    `json:"clientId"`
+       MQs           []*primitive.MessageQueue `json:"mqSet"`
 }
 
-func (dc *defaultConsumer) lock(mq *kernel.MessageQueue) bool {
+func (dc *defaultConsumer) lock(mq *primitive.MessageQueue) bool {
        brokerResult := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, 
kernel.MasterId, true)
 
        if brokerResult == nil {
@@ -433,7 +240,7 @@ func (dc *defaultConsumer) lock(mq *kernel.MessageQueue) 
bool {
        body := &lockBatchRequestBody{
                ConsumerGroup: dc.consumerGroup,
                ClientId:      dc.client.ClientID(),
-               MQs:           []*kernel.MessageQueue{mq},
+               MQs:           []*primitive.MessageQueue{mq},
        }
        lockedMQ := dc.doLock(brokerResult.BrokerAddr, body)
        var lockOK bool
@@ -453,7 +260,7 @@ func (dc *defaultConsumer) lock(mq *kernel.MessageQueue) 
bool {
        return lockOK
 }
 
-func (dc *defaultConsumer) unlock(mq *kernel.MessageQueue, oneway bool) {
+func (dc *defaultConsumer) unlock(mq *primitive.MessageQueue, oneway bool) {
        brokerResult := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, 
kernel.MasterId, true)
 
        if brokerResult == nil {
@@ -463,14 +270,14 @@ func (dc *defaultConsumer) unlock(mq 
*kernel.MessageQueue, oneway bool) {
        body := &lockBatchRequestBody{
                ConsumerGroup: dc.consumerGroup,
                ClientId:      dc.client.ClientID(),
-               MQs:           []*kernel.MessageQueue{mq},
+               MQs:           []*primitive.MessageQueue{mq},
        }
        dc.doUnlock(brokerResult.BrokerAddr, body, oneway)
        rlog.Warnf("unlock messageQueue. group:%s, clientId:%s, mq:%s",
                dc.consumerGroup, dc.client.ClientID(), mq.String())
 }
 
-func (dc *defaultConsumer) lockAll(mq kernel.MessageQueue) {
+func (dc *defaultConsumer) lockAll(mq primitive.MessageQueue) {
        mqMapSet := dc.buildProcessQueueTableByBrokerName()
        for broker, mqs := range mqMapSet {
                if len(mqs) == 0 {
@@ -539,7 +346,7 @@ func (dc *defaultConsumer) unlockAll(oneway bool) {
        }
 }
 
-func (dc *defaultConsumer) doLock(addr string, body *lockBatchRequestBody) 
[]kernel.MessageQueue {
+func (dc *defaultConsumer) doLock(addr string, body *lockBatchRequestBody) 
[]primitive.MessageQueue {
        data, _ := json.Marshal(body)
        request := remote.NewRemotingCommand(kernel.ReqLockBatchMQ, nil, data)
        response, err := dc.client.InvokeSync(addr, request, 1*time.Second)
@@ -548,7 +355,7 @@ func (dc *defaultConsumer) doLock(addr string, body 
*lockBatchRequestBody) []ker
                return nil
        }
        lockOKMQSet := struct {
-               MQs []kernel.MessageQueue `json:"lockOKMQSet"`
+               MQs []primitive.MessageQueue `json:"lockOKMQSet"`
        }{}
        err = json.Unmarshal(response.Body, &lockOKMQSet)
        if err != nil {
@@ -577,14 +384,14 @@ func (dc *defaultConsumer) doUnlock(addr string, body 
*lockBatchRequestBody, one
        }
 }
 
-func (dc *defaultConsumer) buildProcessQueueTableByBrokerName() 
map[string][]*kernel.MessageQueue {
-       result := make(map[string][]*kernel.MessageQueue, 0)
+func (dc *defaultConsumer) buildProcessQueueTableByBrokerName() 
map[string][]*primitive.MessageQueue {
+       result := make(map[string][]*primitive.MessageQueue, 0)
 
        dc.processQueueTable.Range(func(key, value interface{}) bool {
-               mq := key.(*kernel.MessageQueue)
+               mq := key.(*primitive.MessageQueue)
                mqs, exist := result[mq.BrokerName]
                if !exist {
-                       mqs = make([]*kernel.MessageQueue, 0)
+                       mqs = make([]*primitive.MessageQueue, 0)
                }
                mqs = append(mqs, mq)
                result[mq.BrokerName] = mqs
@@ -595,15 +402,15 @@ func (dc *defaultConsumer) 
buildProcessQueueTableByBrokerName() map[string][]*ke
 }
 
 // TODO 问题不少 需要再好好对一下
-func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs 
[]*kernel.MessageQueue) bool {
+func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs 
[]*primitive.MessageQueue) bool {
        var changed bool
-       mqSet := make(map[*kernel.MessageQueue]bool)
+       mqSet := make(map[*primitive.MessageQueue]bool)
        for idx := range mqs {
                mqSet[mqs[idx]] = true
        }
        // TODO
        dc.processQueueTable.Range(func(key, value interface{}) bool {
-               mq := key.(*kernel.MessageQueue)
+               mq := key.(*primitive.MessageQueue)
                pq := value.(*processQueue)
                if mq.Topic == topic {
                        if !mqSet[mq] {
@@ -666,13 +473,13 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic 
string, mqs []*kernel.M
        return changed
 }
 
-func (dc *defaultConsumer) removeUnnecessaryMessageQueue(mq 
*kernel.MessageQueue, pq *processQueue) bool {
-       dc.storage.persist([]*kernel.MessageQueue{mq})
+func (dc *defaultConsumer) removeUnnecessaryMessageQueue(mq 
*primitive.MessageQueue, pq *processQueue) bool {
+       dc.storage.persist([]*primitive.MessageQueue{mq})
        dc.storage.remove(mq)
        return true
 }
 
-func (dc *defaultConsumer) computePullFromWhere(mq *kernel.MessageQueue) int64 
{
+func (dc *defaultConsumer) computePullFromWhere(mq *primitive.MessageQueue) 
int64 {
        if dc.cType == _PullConsume {
                return 0
        }
@@ -682,7 +489,7 @@ func (dc *defaultConsumer) computePullFromWhere(mq 
*kernel.MessageQueue) int64 {
                result = lastOffset
        } else {
                switch dc.fromWhere {
-               case ConsumeFromLastOffset:
+               case primitive.ConsumeFromLastOffset:
                        if lastOffset == -1 {
                                if strings.HasPrefix(mq.Topic, 
kernel.RetryGroupTopicPrefix) {
                                        lastOffset = 0
@@ -697,11 +504,11 @@ func (dc *defaultConsumer) computePullFromWhere(mq 
*kernel.MessageQueue) int64 {
                        } else {
                                result = -1
                        }
-               case ConsumeFromFirstOffset:
+               case primitive.ConsumeFromFirstOffset:
                        if lastOffset == -1 {
                                result = 0
                        }
-               case ConsumeFromTimestamp:
+               case primitive.ConsumeFromTimestamp:
                        if lastOffset == -1 {
                                if strings.HasPrefix(mq.Topic, 
kernel.RetryGroupTopicPrefix) {
                                        lastOffset, err := dc.queryMaxOffset(mq)
@@ -760,12 +567,12 @@ func (dc *defaultConsumer) findConsumerList(topic string) 
[]string {
        return nil
 }
 
-func (dc *defaultConsumer) sendBack(msg *kernel.MessageExt, level int) error {
+func (dc *defaultConsumer) sendBack(msg *primitive.MessageExt, level int) 
error {
        return nil
 }
 
 // QueryMaxOffset with specific queueId and topic
-func (dc *defaultConsumer) queryMaxOffset(mq *kernel.MessageQueue) (int64, 
error) {
+func (dc *defaultConsumer) queryMaxOffset(mq *primitive.MessageQueue) (int64, 
error) {
        brokerAddr := kernel.FindBrokerAddrByName(mq.BrokerName)
        if brokerAddr == "" {
                kernel.UpdateTopicRouteInfo(mq.Topic)
@@ -790,7 +597,7 @@ func (dc *defaultConsumer) queryMaxOffset(mq 
*kernel.MessageQueue) (int64, error
 }
 
 // SearchOffsetByTimestamp with specific queueId and topic
-func (dc *defaultConsumer) searchOffsetByTimestamp(mq *kernel.MessageQueue, 
timestamp int64) (int64, error) {
+func (dc *defaultConsumer) searchOffsetByTimestamp(mq *primitive.MessageQueue, 
timestamp int64) (int64, error) {
        brokerAddr := kernel.FindBrokerAddrByName(mq.BrokerName)
        if brokerAddr == "" {
                kernel.UpdateTopicRouteInfo(mq.Topic)
@@ -815,19 +622,19 @@ func (dc *defaultConsumer) searchOffsetByTimestamp(mq 
*kernel.MessageQueue, time
        return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
 }
 
-func buildSubscriptionData(topic string, selector MessageSelector) 
*kernel.SubscriptionData {
+func buildSubscriptionData(topic string, selector primitive.MessageSelector) 
*kernel.SubscriptionData {
        subData := &kernel.SubscriptionData{
                Topic:     topic,
                SubString: selector.Expression,
                ExpType:   string(selector.Type),
        }
 
-       if selector.Type != "" && selector.Type != TAG {
+       if selector.Type != "" && selector.Type != primitive.TAG {
                return subData
        }
 
        if selector.Expression == "" || selector.Expression == _SubAll {
-               subData.ExpType = string(TAG)
+               subData.ExpType = string(primitive.TAG)
                subData.SubString = _SubAll
        } else {
                tags := strings.Split(selector.Expression, "\\|\\|")
@@ -847,7 +654,7 @@ func buildSubscriptionData(topic string, selector 
MessageSelector) *kernel.Subsc
        return subData
 }
 
-func getNextQueueOf(topic string) *kernel.MessageQueue {
+func getNextQueueOf(topic string) *primitive.MessageQueue {
        queues, err := kernel.FetchSubscribeMessageQueues(topic)
        if err != nil && len(queues) > 0 {
                rlog.Error(err.Error())
@@ -890,7 +697,7 @@ func clearCommitOffsetFlag(sysFlag int32) int32 {
        return sysFlag & (^0x1 << 0)
 }
 
-func tryFindBroker(mq *kernel.MessageQueue) *kernel.FindBrokerResult {
+func tryFindBroker(mq *primitive.MessageQueue) *kernel.FindBrokerResult {
        result := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, 
recalculatePullFromWhichNode(mq), false)
 
        if result == nil {
@@ -903,11 +710,11 @@ var (
        pullFromWhichNodeTable sync.Map
 )
 
-func updatePullFromWhichNode(mq *kernel.MessageQueue, brokerId int64) {
+func updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {
        pullFromWhichNodeTable.Store(mq.HashCode(), brokerId)
 }
 
-func recalculatePullFromWhichNode(mq *kernel.MessageQueue) int64 {
+func recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
        v, exist := pullFromWhichNodeTable.Load(mq.HashCode())
        if exist {
                return v.(int64)
diff --git a/consumer/consumer_test.go b/internal/consumer/consumer_test.go
similarity index 100%
rename from consumer/consumer_test.go
rename to internal/consumer/consumer_test.go
diff --git a/consumer/offset_store.go b/internal/consumer/offset_store.go
similarity index 86%
rename from consumer/offset_store.go
rename to internal/consumer/offset_store.go
index 0edf828..702ba0d 100644
--- a/consumer/offset_store.go
+++ b/internal/consumer/offset_store.go
@@ -20,8 +20,9 @@ package consumer
 import (
        "encoding/json"
        "fmt"
-       "github.com/apache/rocketmq-client-go/kernel"
-       "github.com/apache/rocketmq-client-go/remote"
+       "github.com/apache/rocketmq-client-go/internal/kernel"
+       "github.com/apache/rocketmq-client-go/internal/remote"
+       "github.com/apache/rocketmq-client-go/primitive"
        "github.com/apache/rocketmq-client-go/rlog"
        "github.com/apache/rocketmq-client-go/utils"
        "os"
@@ -50,10 +51,10 @@ func init() {
 }
 
 type OffsetStore interface {
-       persist(mqs []*kernel.MessageQueue)
-       remove(mq *kernel.MessageQueue)
-       read(mq *kernel.MessageQueue, t readType) int64
-       update(mq *kernel.MessageQueue, offset int64, increaseOnly bool)
+       persist(mqs []*primitive.MessageQueue)
+       remove(mq *primitive.MessageQueue)
+       read(mq *primitive.MessageQueue, t readType) int64
+       update(mq *primitive.MessageQueue, offset int64, increaseOnly bool)
 }
 
 type localFileOffsetStore struct {
@@ -108,7 +109,7 @@ func (local *localFileOffsetStore) load() {
        }
 }
 
-func (local *localFileOffsetStore) read(mq *kernel.MessageQueue, t readType) 
int64 {
+func (local *localFileOffsetStore) read(mq *primitive.MessageQueue, t 
readType) int64 {
        if t == _ReadFromMemory || t == _ReadMemoryThenStore {
                off := readFromMemory(local.OffsetTable, mq)
                if off >= 0 || (off == -1 && t == _ReadFromMemory) {
@@ -119,7 +120,7 @@ func (local *localFileOffsetStore) read(mq 
*kernel.MessageQueue, t readType) int
        return readFromMemory(local.OffsetTable, mq)
 }
 
-func (local *localFileOffsetStore) update(mq *kernel.MessageQueue, offset 
int64, increaseOnly bool) {
+func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset 
int64, increaseOnly bool) {
        rlog.Debugf("update offset: %s to %d", mq, offset)
        localOffset, exist := local.OffsetTable[mq.Topic]
        if !exist {
@@ -143,7 +144,7 @@ func (local *localFileOffsetStore) update(mq 
*kernel.MessageQueue, offset int64,
        }
 }
 
-func (local *localFileOffsetStore) persist(mqs []*kernel.MessageQueue) {
+func (local *localFileOffsetStore) persist(mqs []*primitive.MessageQueue) {
        if len(mqs) == 0 {
                return
        }
@@ -171,7 +172,7 @@ func (local *localFileOffsetStore) persist(mqs 
[]*kernel.MessageQueue) {
        utils.CheckError(fmt.Sprintf("persist offset to %s", local.path), 
utils.WriteToFile(local.path, data))
 }
 
-func (local *localFileOffsetStore) remove(mq *kernel.MessageQueue) {
+func (local *localFileOffsetStore) remove(mq *primitive.MessageQueue) {
        // nothing to do
 }
 
@@ -190,7 +191,7 @@ func NewRemoteOffsetStore(group string, client 
*kernel.RMQClient) OffsetStore {
        }
 }
 
-func (r *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue) {
+func (r *remoteBrokerOffsetStore) persist(mqs []*primitive.MessageQueue) {
        r.mutex.Lock()
        defer r.mutex.Unlock()
        if len(mqs) == 0 {
@@ -217,7 +218,7 @@ func (r *remoteBrokerOffsetStore) persist(mqs 
[]*kernel.MessageQueue) {
        }
 }
 
-func (r *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue) {
+func (r *remoteBrokerOffsetStore) remove(mq *primitive.MessageQueue) {
        r.mutex.Lock()
        defer r.mutex.Unlock()
        if mq == nil {
@@ -231,7 +232,7 @@ func (r *remoteBrokerOffsetStore) remove(mq 
*kernel.MessageQueue) {
        delete(offset, mq.QueueId)
 }
 
-func (r *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t readType) 
int64 {
+func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) 
int64 {
        r.mutex.RLock()
        if t == _ReadFromMemory || t == _ReadMemoryThenStore {
                off := readFromMemory(r.OffsetTable, mq)
@@ -251,7 +252,7 @@ func (r *remoteBrokerOffsetStore) read(mq 
*kernel.MessageQueue, t readType) int6
        return off
 }
 
-func (r *remoteBrokerOffsetStore) update(mq *kernel.MessageQueue, offset 
int64, increaseOnly bool) {
+func (r *remoteBrokerOffsetStore) update(mq *primitive.MessageQueue, offset 
int64, increaseOnly bool) {
        rlog.Debugf("update offset: %s to %d", mq, offset)
        r.mutex.Lock()
        defer r.mutex.Unlock()
@@ -278,7 +279,7 @@ func (r *remoteBrokerOffsetStore) update(mq 
*kernel.MessageQueue, offset int64,
        }
 }
 
-func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, 
mq *kernel.MessageQueue) (int64, error) {
+func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, 
mq *primitive.MessageQueue) (int64, error) {
        broker := kernel.FindBrokerAddrByName(mq.BrokerName)
        if broker == "" {
                kernel.UpdateTopicRouteInfo(mq.Topic)
@@ -330,7 +331,7 @@ func (r *remoteBrokerOffsetStore) 
updateConsumeOffsetToBroker(group, topic strin
        return r.client.InvokeOneWay(broker, cmd, 5*time.Second)
 }
 
-func readFromMemory(table map[string]map[int]*queueOffset, mq 
*kernel.MessageQueue) int64 {
+func readFromMemory(table map[string]map[int]*queueOffset, mq 
*primitive.MessageQueue) int64 {
        localOffset, exist := table[mq.Topic]
        if !exist {
                return -1
diff --git a/consumer/process_queue.go b/internal/consumer/process_queue.go
similarity index 87%
rename from consumer/process_queue.go
rename to internal/consumer/process_queue.go
index 77e84fa..2ec865c 100644
--- a/consumer/process_queue.go
+++ b/internal/consumer/process_queue.go
@@ -18,7 +18,7 @@ limitations under the License.
 package consumer
 
 import (
-       "github.com/apache/rocketmq-client-go/kernel"
+       "github.com/apache/rocketmq-client-go/primitive"
        "github.com/apache/rocketmq-client-go/rlog"
        "github.com/emirpasic/gods/maps/treemap"
        "github.com/emirpasic/gods/utils"
@@ -51,7 +51,7 @@ type processQueue struct {
        consuming                  bool
        msgAccCnt                  int64
        lockConsume                sync.Mutex
-       msgCh                      chan []*kernel.MessageExt
+       msgCh                      chan []*primitive.MessageExt
 }
 
 func newProcessQueue() *processQueue {
@@ -60,12 +60,12 @@ func newProcessQueue() *processQueue {
                lastPullTime:    time.Now(),
                lastConsumeTime: time.Now(),
                lastLockTime:    time.Now(),
-               msgCh:           make(chan []*kernel.MessageExt, 32),
+               msgCh:           make(chan []*primitive.MessageExt, 32),
        }
        return pq
 }
 
-func (pq *processQueue) putMessage(messages ...*kernel.MessageExt) {
+func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
        if messages == nil || len(messages) == 0 {
                return
        }
@@ -92,7 +92,7 @@ func (pq *processQueue) putMessage(messages 
...*kernel.MessageExt) {
        }
 
        msg := messages[len(messages)-1]
-       maxOffset, err := 
strconv.ParseInt(msg.Properties[kernel.PropertyMaxOffset], 10, 64)
+       maxOffset, err := 
strconv.ParseInt(msg.Properties[primitive.PropertyMaxOffset], 10, 64)
        if err != nil {
                acc := maxOffset - msg.QueueOffset
                if acc > 0 {
@@ -101,7 +101,7 @@ func (pq *processQueue) putMessage(messages 
...*kernel.MessageExt) {
        }
 }
 
-func (pq *processQueue) removeMessage(messages ...*kernel.MessageExt) int64 {
+func (pq *processQueue) removeMessage(messages ...*primitive.MessageExt) int64 
{
        result := int64(-1)
        pq.mutex.Lock()
        pq.lastConsumeTime = time.Now()
@@ -152,8 +152,8 @@ func (pq *processQueue) cleanExpiredMsg(consumer 
defaultConsumer) {
                        return
                }
                _, firstValue := pq.msgCache.Min()
-               msg := firstValue.(*kernel.MessageExt)
-               startTime := msg.Properties[kernel.PropertyConsumeStartTime]
+               msg := firstValue.(*primitive.MessageExt)
+               startTime := msg.Properties[primitive.PropertyConsumeStartTime]
                if startTime != "" {
                        st, err := strconv.ParseInt(startTime, 10, 64)
                        if err != nil {
@@ -187,15 +187,15 @@ func (pq *processQueue) getMaxSpan() int {
        return int(lastKey.(int64) - firstKey.(int64))
 }
 
-func (pq *processQueue) getMessages() []*kernel.MessageExt {
+func (pq *processQueue) getMessages() []*primitive.MessageExt {
        return <-pq.msgCh
 }
 
-func (pq *processQueue) takeMessages(number int) []*kernel.MessageExt {
+func (pq *processQueue) takeMessages(number int) []*primitive.MessageExt {
        for pq.msgCache.Empty() {
                time.Sleep(10 * time.Millisecond)
        }
-       result := make([]*kernel.MessageExt, number)
+       result := make([]*primitive.MessageExt, number)
        i := 0
        pq.mutex.Lock()
        for ; i < number; i++ {
@@ -203,7 +203,7 @@ func (pq *processQueue) takeMessages(number int) 
[]*kernel.MessageExt {
                if v == nil {
                        break
                }
-               result[i] = v.(*kernel.MessageExt)
+               result[i] = v.(*primitive.MessageExt)
                pq.msgCache.Remove(k)
        }
        pq.mutex.Unlock()
diff --git a/consumer/pull_consumer.go b/internal/consumer/pull_consumer.go
similarity index 70%
rename from consumer/pull_consumer.go
rename to internal/consumer/pull_consumer.go
index 1b4f950..9250771 100644
--- a/consumer/pull_consumer.go
+++ b/internal/consumer/pull_consumer.go
@@ -21,27 +21,23 @@ import (
        "context"
        "errors"
        "fmt"
-       "github.com/apache/rocketmq-client-go/kernel"
+       "github.com/apache/rocketmq-client-go/internal/kernel"
+       "github.com/apache/rocketmq-client-go/primitive"
        "strconv"
        "sync"
 )
 
-type MessageSelector struct {
-       Type       ExpressionType
-       Expression string
-}
-
 type PullConsumer interface {
        Start()
        Shutdown()
-       Pull(ctx context.Context, topic string, selector MessageSelector, 
numbers int) (*kernel.PullResult, error)
+       Pull(ctx context.Context, topic string, selector 
primitive.MessageSelector, numbers int) (*primitive.PullResult, error)
 }
 
 var (
        queueCounterTable sync.Map
 )
 
-func NewConsumer(config ConsumerOption) *defaultPullConsumer {
+func NewConsumer(config primitive.ConsumerOption) *defaultPullConsumer {
        return &defaultPullConsumer{
                option: config,
        }
@@ -49,10 +45,10 @@ func NewConsumer(config ConsumerOption) 
*defaultPullConsumer {
 
 type defaultPullConsumer struct {
        state     kernel.ServiceState
-       option    ConsumerOption
+       option    primitive.ConsumerOption
        client    *kernel.RMQClient
        GroupName string
-       Model     MessageModel
+       Model     primitive.MessageModel
        UnitMode  bool
 }
 
@@ -60,7 +56,7 @@ func (c *defaultPullConsumer) Start() {
        c.state = kernel.StateRunning
 }
 
-func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector 
MessageSelector, numbers int) (*kernel.PullResult, error) {
+func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector 
primitive.MessageSelector, numbers int) (*primitive.PullResult, error) {
        mq := getNextQueueOf(topic)
        if mq == nil {
                return nil, fmt.Errorf("prepard to pull topic: %s, but no queue 
is founded", topic)
@@ -78,22 +74,22 @@ func (c *defaultPullConsumer) Pull(ctx context.Context, 
topic string, selector M
 }
 
 // SubscribeWithChan ack manually
-func (c *defaultPullConsumer) SubscribeWithChan(topic, selector 
MessageSelector) (chan *kernel.Message, error) {
+func (c *defaultPullConsumer) SubscribeWithChan(topic, selector 
primitive.MessageSelector) (chan *primitive.Message, error) {
        return nil, nil
 }
 
 // SubscribeWithFunc ack automatic
-func (c *defaultPullConsumer) SubscribeWithFunc(topic, selector 
MessageSelector,
-       f func(msg *kernel.Message) ConsumeResult) error {
+func (c *defaultPullConsumer) SubscribeWithFunc(topic, selector 
primitive.MessageSelector,
+       f func(msg *primitive.Message) primitive.ConsumeResult) error {
        return nil
 }
 
-func (c *defaultPullConsumer) ACK(msg *kernel.Message, result ConsumeResult) {
+func (c *defaultPullConsumer) ACK(msg *primitive.Message, result 
primitive.ConsumeResult) {
 
 }
 
-func (c *defaultPullConsumer) pull(ctx context.Context, mq 
*kernel.MessageQueue, data *kernel.SubscriptionData,
-       offset int64, numbers int) (*kernel.PullResult, error) {
+func (c *defaultPullConsumer) pull(ctx context.Context, mq 
*primitive.MessageQueue, data *kernel.SubscriptionData,
+       offset int64, numbers int) (*primitive.PullResult, error) {
        err := c.makeSureStateOK()
        if err != nil {
                return nil, err
@@ -117,7 +113,7 @@ func (c *defaultPullConsumer) pull(ctx context.Context, mq 
*kernel.MessageQueue,
                return nil, fmt.Errorf("the broker %s does not exist", 
mq.BrokerName)
        }
 
-       if (data.ExpType == string(TAG)) && brokerResult.BrokerVersion < 
kernel.V4_1_0 {
+       if (data.ExpType == string(primitive.TAG)) && 
brokerResult.BrokerVersion < kernel.V4_1_0 {
                return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to 
support for filter message by %v",
                        mq.BrokerName, brokerResult.BrokerVersion, data.ExpType)
        }
@@ -140,7 +136,7 @@ func (c *defaultPullConsumer) pull(ctx context.Context, mq 
*kernel.MessageQueue,
                ExpressionType:       string(data.ExpType),
        }
 
-       if data.ExpType == string(TAG) {
+       if data.ExpType == string(primitive.TAG) {
                pullRequest.SubVersion = 0
        } else {
                pullRequest.SubVersion = data.SubVersion
@@ -161,18 +157,18 @@ func (c *defaultPullConsumer) 
subscriptionAutomatically(topic string) {
        // TODO
 }
 
-func (c *defaultPullConsumer) nextOffsetOf(queue *kernel.MessageQueue) int64 {
+func (c *defaultPullConsumer) nextOffsetOf(queue *primitive.MessageQueue) 
int64 {
        return 0
 }
 
-func processPullResult(mq *kernel.MessageQueue, result *kernel.PullResult, 
data *kernel.SubscriptionData) {
+func processPullResult(mq *primitive.MessageQueue, result 
*primitive.PullResult, data *kernel.SubscriptionData) {
        updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
        switch result.Status {
-       case kernel.PullFound:
+       case primitive.PullFound:
                msgs := result.GetMessageExts()
                msgListFilterAgain := msgs
                if len(data.Tags) > 0 && data.ClassFilterMode {
-                       msgListFilterAgain = make([]*kernel.MessageExt, 
len(msgs))
+                       msgListFilterAgain = make([]*primitive.MessageExt, 
len(msgs))
                        for _, msg := range msgs {
                                _, exist := data.Tags[msg.GetTags()]
                                if exist {
@@ -184,13 +180,13 @@ func processPullResult(mq *kernel.MessageQueue, result 
*kernel.PullResult, data
                // TODO hook
 
                for _, msg := range msgListFilterAgain {
-                       traFlag, _ := 
strconv.ParseBool(msg.Properties[kernel.PropertyTransactionPrepared])
+                       traFlag, _ := 
strconv.ParseBool(msg.Properties[primitive.PropertyTransactionPrepared])
                        if traFlag {
-                               msg.TransactionId = 
msg.Properties[kernel.PropertyUniqueClientMessageIdKeyIndex]
+                               msg.TransactionId = 
msg.Properties[primitive.PropertyUniqueClientMessageIdKeyIndex]
                        }
 
-                       msg.Properties[kernel.PropertyMinOffset] = 
strconv.FormatInt(result.MinOffset, 10)
-                       msg.Properties[kernel.PropertyMaxOffset] = 
strconv.FormatInt(result.MaxOffset, 10)
+                       msg.Properties[primitive.PropertyMinOffset] = 
strconv.FormatInt(result.MinOffset, 10)
+                       msg.Properties[primitive.PropertyMaxOffset] = 
strconv.FormatInt(result.MaxOffset, 10)
                }
 
                result.SetMessageExts(msgListFilterAgain)
diff --git a/consumer/pull_consumer_test.go 
b/internal/consumer/pull_consumer_test.go
similarity index 100%
rename from consumer/pull_consumer_test.go
rename to internal/consumer/pull_consumer_test.go
diff --git a/consumer/push_consumer.go b/internal/consumer/push_consumer.go
similarity index 88%
rename from consumer/push_consumer.go
rename to internal/consumer/push_consumer.go
index 1cb5efd..b3d1e5d 100644
--- a/consumer/push_consumer.go
+++ b/internal/consumer/push_consumer.go
@@ -21,7 +21,8 @@ import (
        "context"
        "errors"
        "fmt"
-       "github.com/apache/rocketmq-client-go/kernel"
+       "github.com/apache/rocketmq-client-go/internal/kernel"
+       "github.com/apache/rocketmq-client-go/primitive"
        "github.com/apache/rocketmq-client-go/rlog"
        "github.com/apache/rocketmq-client-go/utils"
        "math"
@@ -38,31 +39,28 @@ import (
 // See quick start/Consumer in the example module for a typical usage.
 //
 // <strong>Thread Safety:</strong> After initialization, the instance can be 
regarded as thread-safe.
-type ConsumeResult int
 
 const (
-       Mb                           = 1024 * 1024
-       ConsumeSuccess ConsumeResult = iota
-       ConsumeRetryLater
+       Mb = 1024 * 1024
 )
 
 type PushConsumer interface {
        Start() error
        Shutdown()
-       Subscribe(topic string, selector MessageSelector,
-               f func(*ConsumeMessageContext, []*kernel.MessageExt) 
(ConsumeResult, error)) error
+       Subscribe(topic string, selector primitive.MessageSelector,
+               f func(*ConsumeMessageContext, []*primitive.MessageExt) 
(primitive.ConsumeResult, error)) error
 }
 
 type pushConsumer struct {
        *defaultConsumer
        queueFlowControlTimes        int
        queueMaxSpanFlowControlTimes int
-       consume                      func(*ConsumeMessageContext, 
[]*kernel.MessageExt) (ConsumeResult, error)
-       submitToConsume              func(*processQueue, *kernel.MessageQueue)
+       consume                      func(*ConsumeMessageContext, 
[]*primitive.MessageExt) (primitive.ConsumeResult, error)
+       submitToConsume              func(*processQueue, 
*primitive.MessageQueue)
        subscribedTopic              map[string]string
 }
 
-func NewPushConsumer(consumerGroup string, opt ConsumerOption) (PushConsumer, 
error) {
+func NewPushConsumer(consumerGroup string, opt primitive.ConsumerOption) 
(PushConsumer, error) {
        if err := utils.VerifyIP(opt.NameServerAddr); err != nil {
                return nil, err
        }
@@ -86,23 +84,10 @@ func NewPushConsumer(consumerGroup string, opt 
ConsumerOption) (PushConsumer, er
                option:         opt,
        }
 
-       switch opt.Strategy {
-       case StrategyAveragely:
-               dc.allocate = allocateByAveragely
-       case StrategyAveragelyCircle:
-               dc.allocate = allocateByAveragelyCircle
-       case StrategyConfig:
-               dc.allocate = allocateByConfig
-       case StrategyConsistentHash:
-               dc.allocate = allocateByConsistentHash
-       case StrategyMachineNearby:
-               dc.allocate = allocateByMachineNearby
-       case StrategyMachineRoom:
-               dc.allocate = allocateByMachineRoom
-       default:
-               dc.allocate = allocateByAveragely
+       if opt.Strategy == nil {
+               opt.Strategy = primitive.AllocateByAveragely
        }
-
+       dc.allocate = opt.Strategy
        p := &pushConsumer{
                defaultConsumer: dc,
                subscribedTopic: make(map[string]string, 0),
@@ -124,15 +109,15 @@ func (pc *pushConsumer) Start() error {
                pc.state = kernel.StateStartFailed
                pc.validate()
 
-               if pc.model == Clustering {
+               if pc.model == primitive.Clustering {
                        // set retry topic
                        retryTopic := kernel.GetRetryTopic(pc.consumerGroup)
                        pc.subscriptionDataTable.Store(retryTopic, 
buildSubscriptionData(retryTopic,
-                               MessageSelector{TAG, _SubAll}))
+                               primitive.MessageSelector{primitive.TAG, 
_SubAll}))
                }
 
                pc.client = 
kernel.GetOrNewRocketMQClient(pc.option.ClientOption)
-               if pc.model == Clustering {
+               if pc.model == primitive.Clustering {
                        pc.option.ChangeInstanceNameToPID()
                        pc.storage = NewRemoteOffsetStore(pc.consumerGroup, 
pc.client)
                } else {
@@ -177,8 +162,8 @@ func (pc *pushConsumer) Start() error {
 
 func (pc *pushConsumer) Shutdown() {}
 
-func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
-       f func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, 
error)) error {
+func (pc *pushConsumer) Subscribe(topic string, selector 
primitive.MessageSelector,
+       f func(*ConsumeMessageContext, []*primitive.MessageExt) 
(primitive.ConsumeResult, error)) error {
        if pc.state != kernel.StateCreateJust {
                return errors.New("subscribe topic only started before")
        }
@@ -197,7 +182,7 @@ func (pc *pushConsumer) PersistConsumerOffset() {
        pc.defaultConsumer.persistConsumerOffset()
 }
 
-func (pc *pushConsumer) UpdateTopicSubscribeInfo(topic string, mqs 
[]*kernel.MessageQueue) {
+func (pc *pushConsumer) UpdateTopicSubscribeInfo(topic string, mqs 
[]*primitive.MessageQueue) {
        pc.defaultConsumer.updateTopicSubscribeInfo(topic, mqs)
 }
 
@@ -213,7 +198,7 @@ func (pc *pushConsumer) IsUnitMode() bool {
        return pc.unitMode
 }
 
-func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided 
[]*kernel.MessageQueue) {
+func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided 
[]*primitive.MessageQueue) {
        // TODO
 }
 
@@ -399,7 +384,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                        subExpression      string
                )
 
-               if pc.model == Clustering {
+               if pc.model == primitive.Clustering {
                        commitOffsetValue = pc.storage.read(request.mq, 
_ReadFromMemory)
                        if commitOffsetValue > 0 {
                                commitOffsetEnable = true
@@ -423,7 +408,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                        SysFlag:        sysFlag,
                        CommitOffset:   commitOffsetValue,
                        SubExpression:  _SubAll,
-                       ExpressionType: string(TAG), // TODO
+                       ExpressionType: string(primitive.TAG), // TODO
                }
                //
                //if data.ExpType == string(TAG) {
@@ -445,14 +430,14 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) 
{
                        goto NEXT
                }
 
-               if result.Status == kernel.PullBrokerTimeout {
+               if result.Status == primitive.PullBrokerTimeout {
                        rlog.Warnf("pull broker: %s timeout", 
brokerResult.BrokerAddr)
                        sleepTime = _PullDelayTimeWhenError
                        goto NEXT
                }
 
                switch result.Status {
-               case kernel.PullFound:
+               case primitive.PullFound:
                        rlog.Debugf("Topic: %s, QueueId: %d found messages: 
%d", request.mq.Topic, request.mq.QueueId,
                                len(result.GetMessageExts()))
                        prevRequestOffset := request.nextOffset
@@ -472,19 +457,19 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) 
{
                                rlog.Warnf("[BUG] pull message result maybe 
data wrong, [nextBeginOffset=%d, "+
                                        "firstMsgOffset=%d, 
prevRequestOffset=%d]", result.NextBeginOffset, firstMsgOffset, 
prevRequestOffset)
                        }
-               case kernel.PullNoNewMsg:
+               case primitive.PullNoNewMsg:
                        rlog.Debugf("Topic: %s, QueueId: %d no more msg, next 
offset: %d", request.mq.Topic, request.mq.QueueId, result.NextBeginOffset)
-               case kernel.PullNoMsgMatched:
+               case primitive.PullNoMsgMatched:
                        request.nextOffset = result.NextBeginOffset
                        pc.correctTagsOffset(request)
-               case kernel.PullOffsetIllegal:
+               case primitive.PullOffsetIllegal:
                        rlog.Warnf("the pull request offset illegal, {} {}", 
request.String(), result.String())
                        request.nextOffset = result.NextBeginOffset
                        pq.dropped = true
                        go func() {
                                time.Sleep(10 * time.Second)
                                pc.storage.update(request.mq, 
request.nextOffset, false)
-                               
pc.storage.persist([]*kernel.MessageQueue{request.mq})
+                               
pc.storage.persist([]*primitive.MessageQueue{request.mq})
                                pc.storage.remove(request.mq)
                                rlog.Warnf("fix the pull request offset: %s", 
request.String())
                        }()
@@ -499,7 +484,7 @@ func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) {
        // TODO
 }
 
-func (pc *pushConsumer) sendMessageBack(ctx *ConsumeMessageContext, msg 
*kernel.MessageExt) bool {
+func (pc *pushConsumer) sendMessageBack(ctx *ConsumeMessageContext, msg 
*primitive.MessageExt) bool {
        return true
 }
 
@@ -514,7 +499,7 @@ func (pc *pushConsumer) resume() {
        rlog.Infof("resume consumer: %s", pc.consumerGroup)
 }
 
-func (pc *pushConsumer) resetOffset(topic string, table 
map[kernel.MessageQueue]int64) {
+func (pc *pushConsumer) resetOffset(topic string, table 
map[primitive.MessageQueue]int64) {
        //topic := cmd.ExtFields["topic"]
        //group := cmd.ExtFields["group"]
        //if topic == "" || group == "" {
@@ -529,7 +514,7 @@ func (pc *pushConsumer) resetOffset(topic string, table 
map[kernel.MessageQueue]
        //rlog.Infof("invoke reset offset operation from broker. brokerAddr=%s, 
topic=%s, group=%s, timestamp=%v",
        //      from, topic, group, t)
        //
-       //offsetTable := make(map[kernel.MessageQueue]int64, 0)
+       //offsetTable := make(map[primitive.MessageQueue]int64, 0)
        //err = json.Unmarshal(cmd.Body, &offsetTable)
        //if err != nil {
        //      rlog.Warnf("received reset offset command from: %s, but parse 
offset table: %s", err.Error())
@@ -541,7 +526,7 @@ func (pc *pushConsumer) resetOffset(topic string, table 
map[kernel.MessageQueue]
        //      return
        //}
 
-       set := make(map[int]*kernel.MessageQueue, 0)
+       set := make(map[int]*primitive.MessageQueue, 0)
        for k := range table {
                set[k.HashCode()] = &k
        }
@@ -559,7 +544,7 @@ func (pc *pushConsumer) resetOffset(topic string, table 
map[kernel.MessageQueue]
        if !exist {
                return
        }
-       queuesOfTopic := v.(map[int]*kernel.MessageQueue)
+       queuesOfTopic := v.(map[int]*primitive.MessageQueue)
        for k := range queuesOfTopic {
                q := set[k]
                if q != nil {
@@ -575,9 +560,9 @@ func (pc *pushConsumer) resetOffset(topic string, table 
map[kernel.MessageQueue]
        }
 }
 
-func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq *kernel.MessageQueue, 
pq *processQueue) bool {
+func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq 
*primitive.MessageQueue, pq *processQueue) bool {
        pc.defaultConsumer.removeUnnecessaryMessageQueue(mq, pq)
-       if !pc.consumeOrderly || Clustering != pc.model {
+       if !pc.consumeOrderly || primitive.Clustering != pc.model {
                return true
        }
        // TODO orderly
@@ -586,21 +571,21 @@ func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq 
*kernel.MessageQueue, p
 
 type ConsumeMessageContext struct {
        consumerGroup string
-       msgs          []*kernel.MessageExt
-       mq            *kernel.MessageQueue
+       msgs          []*primitive.MessageExt
+       mq            *primitive.MessageQueue
        success       bool
        status        string
        // mqTractContext
        properties map[string]string
 }
 
-func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq 
*kernel.MessageQueue) {
+func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq 
*primitive.MessageQueue) {
        msgs := pq.getMessages()
        if msgs == nil {
                return
        }
        for count := 0; count < len(msgs); count++ {
-               var subMsgs []*kernel.MessageExt
+               var subMsgs []*primitive.MessageExt
                if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) {
                        subMsgs = msgs[count:]
                        count = len(msgs)
@@ -626,11 +611,11 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *kernel.Mes
                        for idx := range subMsgs {
                                msg := subMsgs[idx]
                                if msg.Properties != nil {
-                                       retryTopic := 
msg.Properties[kernel.PropertyRetryTopic]
+                                       retryTopic := 
msg.Properties[primitive.PropertyRetryTopic]
                                        if retryTopic == "" && groupTopic == 
msg.Topic {
                                                msg.Topic = retryTopic
                                        }
-                                       
subMsgs[idx].Properties[kernel.PropertyConsumeStartTime] = strconv.FormatInt(
+                                       
subMsgs[idx].Properties[primitive.PropertyConsumeStartTime] = strconv.FormatInt(
                                                
beginTime.UnixNano()/int64(time.Millisecond), 10)
                                }
                        }
@@ -640,7 +625,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *kernel.Mes
                                ctx.properties["ConsumeContextType"] = 
"EXCEPTION"
                        } else if consumeRT >= pc.option.ConsumeTimeout {
                                ctx.properties["ConsumeContextType"] = "TIMEOUT"
-                       } else if result == ConsumeSuccess {
+                       } else if result == primitive.ConsumeSuccess {
                                ctx.properties["ConsumeContextType"] = "SUCCESS"
                        } else {
                                ctx.properties["ConsumeContextType"] = 
"RECONSUME_LATER"
@@ -650,12 +635,12 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *kernel.Mes
                        increaseConsumeRT(pc.consumerGroup, mq.Topic, consumeRT)
 
                        if !pq.dropped {
-                               msgBackFailed := make([]*kernel.MessageExt, 0)
-                               if result == ConsumeSuccess {
+                               msgBackFailed := make([]*primitive.MessageExt, 
0)
+                               if result == primitive.ConsumeSuccess {
                                        increaseConsumeOKTPS(pc.consumerGroup, 
mq.Topic, len(subMsgs))
                                } else {
                                        
increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
-                                       if pc.model == BroadCasting {
+                                       if pc.model == primitive.BroadCasting {
                                                for i := 0; i < len(msgs); i++ {
                                                        
rlog.Warnf("BROADCASTING, the message=%s consume failed, drop it, {}", 
subMsgs[i])
                                                }
@@ -688,5 +673,5 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *kernel.Mes
        }
 }
 
-func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq 
*kernel.MessageQueue) {
+func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq 
*primitive.MessageQueue) {
 }
diff --git a/consumer/push_consumer_test.go 
b/internal/consumer/push_consumer_test.go
similarity index 100%
rename from consumer/push_consumer_test.go
rename to internal/consumer/push_consumer_test.go
diff --git a/consumer/statistics.go b/internal/consumer/statistics.go
similarity index 100%
rename from consumer/statistics.go
rename to internal/consumer/statistics.go
diff --git a/kernel/client.go b/internal/kernel/client.go
similarity index 86%
rename from kernel/client.go
rename to internal/kernel/client.go
index 9dbecc6..78de681 100644
--- a/kernel/client.go
+++ b/internal/kernel/client.go
@@ -22,7 +22,8 @@ import (
        "context"
        "errors"
        "fmt"
-       "github.com/apache/rocketmq-client-go/remote"
+       "github.com/apache/rocketmq-client-go/internal/remote"
+       "github.com/apache/rocketmq-client-go/primitive"
        "github.com/apache/rocketmq-client-go/rlog"
        "os"
        "strconv"
@@ -65,28 +66,6 @@ func init() {
        }
 }
 
-type ClientOption struct {
-       NameServerAddr    string
-       ClientIP          string
-       InstanceName      string
-       UnitMode          bool
-       UnitName          string
-       VIPChannelEnabled bool
-       UseTLS            bool
-}
-
-func (opt *ClientOption) ChangeInstanceNameToPID() {
-       if opt.InstanceName == "DEFAULT" {
-               opt.InstanceName = strconv.Itoa(os.Getegid())
-       }
-}
-
-func (opt *ClientOption) String() string {
-       return fmt.Sprintf("ClientOption [ClientIP=%s, InstanceName=%s, "+
-               "UnitMode=%v, UnitName=%s, VIPChannelEnabled=%v, UseTLS=%v]", 
opt.ClientIP,
-               opt.InstanceName, opt.UnitMode, opt.UnitName, 
opt.VIPChannelEnabled, opt.UseTLS)
-}
-
 type InnerProducer interface {
        PublishTopicList() []string
        UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
@@ -97,7 +76,7 @@ type InnerProducer interface {
 
 type InnerConsumer interface {
        PersistConsumerOffset()
-       UpdateTopicSubscribeInfo(topic string, mqs []*MessageQueue)
+       UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue)
        IsSubscribeTopicNeedUpdate(topic string) bool
        SubscriptionDataList() []*SubscriptionData
        Rebalance()
@@ -105,7 +84,7 @@ type InnerConsumer interface {
 }
 
 type RMQClient struct {
-       option ClientOption
+       option primitive.ClientOption
        // group -> InnerProducer
        producerMap sync.Map
 
@@ -119,7 +98,7 @@ type RMQClient struct {
 
 var clientMap sync.Map
 
-func GetOrNewRocketMQClient(option ClientOption) *RMQClient {
+func GetOrNewRocketMQClient(option primitive.ClientOption) *RMQClient {
        client := &RMQClient{
                option:       option,
                remoteClient: remote.NewRemotingClient(),
@@ -298,12 +277,12 @@ func (c *RMQClient) UpdateTopicRouteInfo() {
 
 // SendMessageAsync send message with batch by async
 func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs, 
brokerName string, request *SendMessageRequest,
-       msgs []*Message, f func(result *SendResult)) error {
+       msgs []*primitive.Message, f func(result *primitive.SendResult)) error {
        return nil
 }
 
 func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, 
request *SendMessageRequest,
-       msgs []*Message) (*SendResult, error) {
+       msgs []*primitive.Message) (*primitive.SendResult, error) {
        cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, 
encodeMessages(msgs))
        err := c.remoteClient.InvokeOneWay(brokerAddrs, cmd, 3*time.Second)
        if err != nil {
@@ -312,28 +291,28 @@ func (c *RMQClient) SendMessageOneWay(ctx 
context.Context, brokerAddrs string, r
        return nil, err
 }
 
-func (c *RMQClient) ProcessSendResponse(brokerName string, cmd 
*remote.RemotingCommand, msgs ...*Message) *SendResult {
-       var status SendStatus
+func (c *RMQClient) ProcessSendResponse(brokerName string, cmd 
*remote.RemotingCommand, msgs ...*primitive.Message) *primitive.SendResult {
+       var status primitive.SendStatus
        switch cmd.Code {
        case ResFlushDiskTimeout:
-               status = SendFlushDiskTimeout
+               status = primitive.SendFlushDiskTimeout
        case ResFlushSlaveTimeout:
-               status = SendFlushSlaveTimeout
+               status = primitive.SendFlushSlaveTimeout
        case ResSlaveNotAvailable:
-               status = SendSlaveNotAvailable
+               status = primitive.SendSlaveNotAvailable
        case ResSuccess:
-               status = SendOK
+               status = primitive.SendOK
        default:
                // TODO process unknown code
        }
 
        msgIDs := make([]string, 0)
        for i := 0; i < len(msgs); i++ {
-               msgIDs = append(msgIDs, 
msgs[i].Properties[PropertyUniqueClientMessageIdKeyIndex])
+               msgIDs = append(msgIDs, 
msgs[i].Properties[primitive.PropertyUniqueClientMessageIdKeyIndex])
        }
 
-       regionId := cmd.ExtFields[PropertyMsgRegion]
-       trace := cmd.ExtFields[PropertyTraceSwitch]
+       regionId := cmd.ExtFields[primitive.PropertyMsgRegion]
+       trace := cmd.ExtFields[primitive.PropertyTraceSwitch]
 
        if regionId == "" {
                regionId = defaultTraceRegionID
@@ -341,11 +320,11 @@ func (c *RMQClient) ProcessSendResponse(brokerName 
string, cmd *remote.RemotingC
 
        qId, _ := strconv.Atoi(cmd.ExtFields["queueId"])
        off, _ := strconv.ParseInt(cmd.ExtFields["queueOffset"], 10, 64)
-       return &SendResult{
+       return &primitive.SendResult{
                Status:      status,
                MsgID:       cmd.ExtFields["msgId"],
                OffsetMsgID: cmd.ExtFields["msgId"],
-               MessageQueue: &MessageQueue{
+               MessageQueue: &primitive.MessageQueue{
                        Topic:      msgs[0].Topic,
                        BrokerName: brokerName,
                        QueueId:    qId,
@@ -358,7 +337,7 @@ func (c *RMQClient) ProcessSendResponse(brokerName string, 
cmd *remote.RemotingC
 }
 
 // PullMessage with sync
-func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, 
request *PullMessageRequest) (*PullResult, error) {
+func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, 
request *PullMessageRequest) (*primitive.PullResult, error) {
        cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
        res, err := c.remoteClient.InvokeSync(brokerAddrs, cmd, 3*time.Second)
        if err != nil {
@@ -368,17 +347,17 @@ func (c *RMQClient) PullMessage(ctx context.Context, 
brokerAddrs string, request
        return c.processPullResponse(res)
 }
 
-func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) 
(*PullResult, error) {
-       pullResult := &PullResult{}
+func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) 
(*primitive.PullResult, error) {
+       pullResult := &primitive.PullResult{}
        switch response.Code {
        case ResSuccess:
-               pullResult.Status = PullFound
+               pullResult.Status = primitive.PullFound
        case ResPullNotFound:
-               pullResult.Status = PullNoNewMsg
+               pullResult.Status = primitive.PullNoNewMsg
        case ResPullRetryImmediately:
-               pullResult.Status = PullNoMsgMatched
+               pullResult.Status = primitive.PullNoMsgMatched
        case ResPullOffsetMoved:
-               pullResult.Status = PullOffsetIllegal
+               pullResult.Status = primitive.PullOffsetIllegal
        default:
                return nil, fmt.Errorf("unknown Response Code: %d, remark: %s", 
response.Code, response.Remark)
        }
@@ -403,13 +382,13 @@ func (c *RMQClient) processPullResponse(response 
*remote.RemotingCommand) (*Pull
                pullResult.SuggestWhichBrokerId, _ = strconv.ParseInt(v, 10, 64)
        }
 
-       pullResult.messageExts = decodeMessage(response.Body)
+       //pullResult.messageExts = decodeMessage(response.Body) TODO parse in 
top
 
        return pullResult, nil
 }
 
 // PullMessageAsync pull message async
-func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, 
request *PullMessageRequest, f func(result *PullResult)) error {
+func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, 
request *PullMessageRequest, f func(result *primitive.PullResult)) error {
        return nil
 }
 
@@ -501,13 +480,13 @@ func (c *RMQClient) isNeedUpdateSubscribeInfo(topic 
string) bool {
        return result
 }
 
-func routeData2SubscribeInfo(topic string, data *TopicRouteData) 
[]*MessageQueue {
-       list := make([]*MessageQueue, 0)
+func routeData2SubscribeInfo(topic string, data *TopicRouteData) 
[]*primitive.MessageQueue {
+       list := make([]*primitive.MessageQueue, 0)
        for idx := range data.QueueDataList {
                qd := data.QueueDataList[idx]
                if queueIsReadable(qd.Perm) {
                        for i := 0; i < qd.ReadQueueNums; i++ {
-                               list = append(list, &MessageQueue{
+                               list = append(list, &primitive.MessageQueue{
                                        Topic:      topic,
                                        BrokerName: qd.BrokerName,
                                        QueueId:    i,
@@ -518,7 +497,7 @@ func routeData2SubscribeInfo(topic string, data 
*TopicRouteData) []*MessageQueue
        return list
 }
 
-func encodeMessages(message []*Message) []byte {
+func encodeMessages(message []*primitive.Message) []byte {
        var buffer bytes.Buffer
        index := 0
        for index < len(message) {
diff --git a/kernel/client_test.go b/internal/kernel/client_test.go
similarity index 100%
rename from kernel/client_test.go
rename to internal/kernel/client_test.go
diff --git a/kernel/constants.go b/internal/kernel/constants.go
similarity index 100%
rename from kernel/constants.go
rename to internal/kernel/constants.go
diff --git a/internal/kernel/model.go b/internal/kernel/model.go
new file mode 100644
index 0000000..b3ef62e
--- /dev/null
+++ b/internal/kernel/model.go
@@ -0,0 +1,80 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+import (
+       "encoding/json"
+       "github.com/apache/rocketmq-client-go/rlog"
+)
+
+type FindBrokerResult struct {
+       BrokerAddr    string
+       Slave         bool
+       BrokerVersion int32
+}
+
+type (
+       // groupName of consumer
+       producerData string
+
+       consumeType string
+
+       ServiceState int
+)
+
+const (
+       StateCreateJust ServiceState = iota
+       StateStartFailed
+       StateRunning
+       StateShutdown
+)
+
+type SubscriptionData struct {
+       ClassFilterMode bool
+       Topic           string
+       SubString       string
+       Tags            map[string]bool
+       Codes           map[int32]bool
+       SubVersion      int64
+       ExpType         string
+}
+
+type consumerData struct {
+       GroupName         string              `json:"groupName"`
+       CType             consumeType         `json:"consumeType"`
+       MessageModel      string              `json:"messageModel"`
+       Where             string              `json:"consumeFromWhere"`
+       SubscriptionDatas []*SubscriptionData `json:"subscriptionDataSet"`
+       UnitMode          bool                `json:"unitMode"`
+}
+
+type heartbeatData struct {
+       ClientId      string         `json:"clientID"`
+       ProducerDatas []producerData `json:"producerDataSet"`
+       ConsumerDatas []consumerData `json:"consumerDataSet"`
+}
+
+func (data *heartbeatData) encode() []byte {
+       d, err := json.Marshal(data)
+       if err != nil {
+               rlog.Errorf("marshal heartbeatData error: %s", err.Error())
+               return nil
+       }
+       rlog.Info(string(d))
+       return d
+}
diff --git a/kernel/mq_version.go b/internal/kernel/mq_version.go
similarity index 100%
rename from kernel/mq_version.go
rename to internal/kernel/mq_version.go
diff --git a/kernel/perm.go b/internal/kernel/perm.go
similarity index 100%
rename from kernel/perm.go
rename to internal/kernel/perm.go
diff --git a/kernel/request.go b/internal/kernel/request.go
similarity index 100%
rename from kernel/request.go
rename to internal/kernel/request.go
diff --git a/kernel/response.go b/internal/kernel/response.go
similarity index 100%
rename from kernel/response.go
rename to internal/kernel/response.go
diff --git a/kernel/route.go b/internal/kernel/route.go
similarity index 96%
rename from kernel/route.go
rename to internal/kernel/route.go
index d0678e6..f216c0e 100644
--- a/kernel/route.go
+++ b/internal/kernel/route.go
@@ -20,7 +20,8 @@ package kernel
 import (
        "encoding/json"
        "errors"
-       "github.com/apache/rocketmq-client-go/remote"
+       "github.com/apache/rocketmq-client-go/internal/remote"
+       "github.com/apache/rocketmq-client-go/primitive"
        "github.com/apache/rocketmq-client-go/rlog"
        "github.com/apache/rocketmq-client-go/utils"
        "github.com/tidwall/gjson"
@@ -98,7 +99,7 @@ func cleanOfflineBroker() {
 type TopicPublishInfo struct {
        OrderTopic          bool
        HaveTopicRouterInfo bool
-       MqList              []*MessageQueue
+       MqList              []*primitive.MessageQueue
        RouteData           *TopicRouteData
        TopicQueueIndex     int32
 }
@@ -221,19 +222,19 @@ func FindBrokerAddressInSubscribe(brokerName string, 
brokerId int64, onlyThisBro
        return result
 }
 
-func FetchSubscribeMessageQueues(topic string) ([]*MessageQueue, error) {
+func FetchSubscribeMessageQueues(topic string) ([]*primitive.MessageQueue, 
error) {
        routeData, err := queryTopicRouteInfoFromServer(topic)
 
        if err != nil {
                return nil, err
        }
 
-       mqs := make([]*MessageQueue, 0)
+       mqs := make([]*primitive.MessageQueue, 0)
 
        for _, qd := range routeData.QueueDataList {
                if queueIsReadable(qd.Perm) {
                        for i := 0; i < qd.ReadQueueNums; i++ {
-                               mqs = append(mqs, &MessageQueue{Topic: topic, 
BrokerName: qd.BrokerName, QueueId: i})
+                               mqs = append(mqs, 
&primitive.MessageQueue{Topic: topic, BrokerName: qd.BrokerName, QueueId: i})
                        }
                }
        }
@@ -321,7 +322,7 @@ func routeData2PublishInfo(topic string, data 
*TopicRouteData) *TopicPublishInfo
                        item := strings.Split(broker, ":")
                        nums, _ := strconv.Atoi(item[1])
                        for i := 0; i < nums; i++ {
-                               mq := &MessageQueue{
+                               mq := &primitive.MessageQueue{
                                        Topic:      topic,
                                        BrokerName: item[0],
                                        QueueId:    i,
@@ -357,7 +358,7 @@ func routeData2PublishInfo(topic string, data 
*TopicRouteData) *TopicPublishInfo
                }
 
                for i := 0; i < qd.WriteQueueNums; i++ {
-                       mq := &MessageQueue{
+                       mq := &primitive.MessageQueue{
                                Topic:      topic,
                                BrokerName: qd.BrokerName,
                                QueueId:    i,
diff --git a/kernel/route_test.go b/internal/kernel/route_test.go
similarity index 100%
rename from kernel/route_test.go
rename to internal/kernel/route_test.go
diff --git a/kernel/transaction.go b/internal/kernel/transaction.go
similarity index 100%
rename from kernel/transaction.go
rename to internal/kernel/transaction.go
diff --git a/kernel/validators.go b/internal/kernel/validators.go
similarity index 100%
rename from kernel/validators.go
rename to internal/kernel/validators.go
diff --git a/producer/producer.go b/internal/producer/producer.go
similarity index 85%
rename from producer/producer.go
rename to internal/producer/producer.go
index 8bb0bf8..52657ee 100644
--- a/producer/producer.go
+++ b/internal/producer/producer.go
@@ -21,8 +21,9 @@ import (
        "context"
        "errors"
        "fmt"
-       "github.com/apache/rocketmq-client-go/kernel"
-       "github.com/apache/rocketmq-client-go/remote"
+       "github.com/apache/rocketmq-client-go/internal/kernel"
+       "github.com/apache/rocketmq-client-go/internal/remote"
+       "github.com/apache/rocketmq-client-go/primitive"
        "github.com/apache/rocketmq-client-go/rlog"
        "github.com/apache/rocketmq-client-go/utils"
        "os"
@@ -34,11 +35,11 @@ import (
 type Producer interface {
        Start() error
        Shutdown() error
-       SendSync(context.Context, *kernel.Message) (*kernel.SendResult, error)
-       SendOneWay(context.Context, *kernel.Message) error
+       SendSync(context.Context, *primitive.Message) (*primitive.SendResult, 
error)
+       SendOneWay(context.Context, *primitive.Message) error
 }
 
-func NewProducer(opt ProducerOptions) (Producer, error) {
+func NewProducer(opt primitive.ProducerOptions) (Producer, error) {
        if err := utils.VerifyIP(opt.NameServerAddr); err != nil {
                return nil, err
        }
@@ -63,18 +64,10 @@ type defaultProducer struct {
        group       string
        client      *kernel.RMQClient
        state       kernel.ServiceState
-       options     ProducerOptions
+       options     primitive.ProducerOptions
        publishInfo sync.Map
 }
 
-type ProducerOptions struct {
-       kernel.ClientOption
-       NameServerAddr           string
-       GroupName                string
-       RetryTimesWhenSendFailed int
-       UnitMode                 bool
-}
-
 func (p *defaultProducer) Start() error {
        p.state = kernel.StateRunning
        p.client.RegisterProducer(p.group, p)
@@ -86,7 +79,7 @@ func (p *defaultProducer) Shutdown() error {
        return nil
 }
 
-func (p *defaultProducer) SendSync(ctx context.Context, msg *kernel.Message) 
(*kernel.SendResult, error) {
+func (p *defaultProducer) SendSync(ctx context.Context, msg 
*primitive.Message) (*primitive.SendResult, error) {
        if msg == nil {
                return nil, errors.New("message is nil")
        }
@@ -122,7 +115,7 @@ func (p *defaultProducer) SendSync(ctx context.Context, msg 
*kernel.Message) (*k
        return nil, err
 }
 
-func (p *defaultProducer) SendOneWay(ctx context.Context, msg *kernel.Message) 
error {
+func (p *defaultProducer) SendOneWay(ctx context.Context, msg 
*primitive.Message) error {
        if msg == nil {
                return errors.New("message is nil")
        }
@@ -157,7 +150,7 @@ func (p *defaultProducer) SendOneWay(ctx context.Context, 
msg *kernel.Message) e
        return err
 }
 
-func (p *defaultProducer) buildSendRequest(mq *kernel.MessageQueue, msg 
*kernel.Message) *remote.RemotingCommand {
+func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue, msg 
*primitive.Message) *remote.RemotingCommand {
        req := &kernel.SendMessageRequest{
                ProducerGroup:  p.group,
                Topic:          mq.Topic,
@@ -173,7 +166,7 @@ func (p *defaultProducer) buildSendRequest(mq 
*kernel.MessageQueue, msg *kernel.
        return remote.NewRemotingCommand(kernel.ReqSendMessage, req, msg.Body)
 }
 
-func (p *defaultProducer) selectMessageQueue(topic string) 
*kernel.MessageQueue {
+func (p *defaultProducer) selectMessageQueue(topic string) 
*primitive.MessageQueue {
        v, exist := p.publishInfo.Load(topic)
 
        if !exist {
diff --git a/remote/codec.go b/internal/remote/codec.go
similarity index 100%
rename from remote/codec.go
rename to internal/remote/codec.go
diff --git a/remote/codec_test.go b/internal/remote/codec_test.go
similarity index 100%
rename from remote/codec_test.go
rename to internal/remote/codec_test.go
diff --git a/remote/remote_client.go b/internal/remote/remote_client.go
similarity index 100%
rename from remote/remote_client.go
rename to internal/remote/remote_client.go
diff --git a/remote/remote_client_test.go 
b/internal/remote/remote_client_test.go
similarity index 100%
rename from remote/remote_client_test.go
rename to internal/remote/remote_client_test.go
diff --git a/remote/rpchook.go b/internal/remote/rpchook.go
similarity index 100%
rename from remote/rpchook.go
rename to internal/remote/rpchook.go
diff --git a/primitive/consume.go b/primitive/consume.go
new file mode 100644
index 0000000..40e89b9
--- /dev/null
+++ b/primitive/consume.go
@@ -0,0 +1,128 @@
+package primitive
+
+// Message model defines the way how messages are delivered to each consumer 
clients.
+// </p>
+//
+// RocketMQ supports two message models: clustering and broadcasting. If 
clustering is set, consumer clients with
+// the same {@link #consumerGroup} would only consume shards of the messages 
subscribed, which achieves load
+// balances; Conversely, if the broadcasting is set, each consumer client will 
consume all subscribed messages
+// separately.
+// </p>
+//
+// This field defaults to clustering.
+type MessageModel int
+
+const (
+       BroadCasting MessageModel = iota
+       Clustering
+)
+
+func (mode MessageModel) String() string {
+       switch mode {
+       case BroadCasting:
+               return "BroadCasting"
+       case Clustering:
+               return "Clustering"
+       default:
+               return "Unknown"
+       }
+}
+
+// Consuming point on consumer booting.
+// </p>
+//
+// There are three consuming points:
+// <ul>
+// <li>
+// <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it 
stopped previously.
+// If it were a newly booting up consumer client, according aging of the 
consumer group, there are two
+// cases:
+// <ol>
+// <li>
+// if the consumer group is created so recently that the earliest message 
being subscribed has yet
+// expired, which means the consumer group represents a lately launched 
business, consuming will
+// start from the very beginning;
+// </li>
+// <li>
+// if the earliest message being subscribed has expired, consuming will start 
from the latest
+// messages, meaning messages born prior to the booting timestamp would be 
ignored.
+// </li>
+// </ol>
+// </li>
+// <li>
+// <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from 
earliest messages available.
+// </li>
+// <li>
+// <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from 
specified timestamp, which means
+// messages born prior to {@link #consumeTimestamp} will be ignored
+// </li>
+// </ul>
+type ConsumeFromWhere int
+
+const (
+       ConsumeFromLastOffset ConsumeFromWhere = iota
+       ConsumeFromFirstOffset
+       ConsumeFromTimestamp
+)
+
+type ExpressionType string
+
+const (
+       /**
+        * <ul>
+        * Keywords:
+        * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
+        * </ul>
+        * <p/>
+        * <ul>
+        * Data type:
+        * <li>Boolean, like: TRUE, FALSE</li>
+        * <li>String, like: 'abc'</li>
+        * <li>Decimal, like: 123</li>
+        * <li>Float number, like: 3.1415</li>
+        * </ul>
+        * <p/>
+        * <ul>
+        * Grammar:
+        * <li>{@code AND, OR}</li>
+        * <li>{@code >, >=, <, <=, =}</li>
+        * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
+        * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
+        * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this 
operation only support String type.</li>
+        * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is 
null, or not.</li>
+        * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, 
or false.</li>
+        * </ul>
+        * <p/>
+        * <p>
+        * Example:
+        * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
+        * </p>
+        */
+       SQL92 = ExpressionType("SQL92")
+
+       /**
+        * Only support or operation such as
+        * "tag1 || tag2 || tag3", <br>
+        * If null or * expression, meaning subscribe all.
+        */
+       TAG = ExpressionType("TAG")
+)
+
+func IsTagType(exp string) bool {
+       if exp == "" || exp == "TAG" {
+               return true
+       }
+       return false
+}
+
+type MessageSelector struct {
+       Type       ExpressionType
+       Expression string
+}
+
+type ConsumeResult int
+
+const (
+       ConsumeSuccess ConsumeResult = iota
+       ConsumeRetryLater
+)
diff --git a/kernel/message.go b/primitive/message.go
similarity index 85%
rename from kernel/message.go
rename to primitive/message.go
index 0f65853..d15c8a3 100644
--- a/kernel/message.go
+++ b/primitive/message.go
@@ -15,9 +15,12 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
-package kernel
+package primitive
 
-import "fmt"
+import (
+       "fmt"
+       "github.com/apache/rocketmq-client-go/utils"
+)
 
 const (
        PropertyKeySeparator                   = " "
@@ -118,3 +121,28 @@ func (msgExt *MessageExt) String() string {
                msgExt.StoreTimestamp, msgExt.StoreHost, 
msgExt.CommitLogOffset, msgExt.BodyCRC, msgExt.ReconsumeTimes,
                msgExt.PreparedTransactionOffset)
 }
+
+// MessageQueue message queue
+type MessageQueue struct {
+       Topic      string `json:"topic"`
+       BrokerName string `json:"brokerName"`
+       QueueId    int    `json:"queueId"`
+}
+
+func (mq *MessageQueue) String() string {
+       return fmt.Sprintf("MessageQueue [topic=%s, brokerName=%s, 
queueId=%d]", mq.Topic, mq.BrokerName, mq.QueueId)
+}
+
+func (mq *MessageQueue) HashCode() int {
+       result := 1
+       result = 31*result + utils.HashString(mq.BrokerName)
+       result = 31*result + mq.QueueId
+       result = 31*result + utils.HashString(mq.Topic)
+
+       return result
+}
+
+func (mq *MessageQueue) Equals(queue *MessageQueue) bool {
+       // TODO
+       return true
+}
diff --git a/primitive/options.go b/primitive/options.go
new file mode 100644
index 0000000..b657ac0
--- /dev/null
+++ b/primitive/options.go
@@ -0,0 +1,132 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package primitive
+
+import (
+       "fmt"
+       "os"
+       "strconv"
+       "time"
+)
+
+type ProducerOptions struct {
+       ClientOption
+       NameServerAddr           string
+       GroupName                string
+       RetryTimesWhenSendFailed int
+       UnitMode                 bool
+}
+
+type ConsumerOption struct {
+       ClientOption
+       NameServerAddr string
+
+       /**
+        * Backtracking consumption time with second precision. Time format is
+        * 20131223171201<br>
+        * Implying Seventeen twelve and 01 seconds on December 23, 2013 
year<br>
+        * Default backtracking consumption time Half an hour ago.
+        */
+       ConsumeTimestamp string
+
+       // The socket timeout in milliseconds
+       ConsumerPullTimeout time.Duration
+
+       // Concurrently max span offset.it has no effect on sequential 
consumption
+       ConsumeConcurrentlyMaxSpan int
+
+       // Flow control threshold on queue level, each message queue will cache 
at most 1000 messages by default,
+       // Consider the {PullBatchSize}, the instantaneous value may exceed the 
limit
+       PullThresholdForQueue int64
+
+       // Limit the cached message size on queue level, each message queue 
will cache at most 100 MiB messages by default,
+       // Consider the {@code pullBatchSize}, the instantaneous value may 
exceed the limit
+       //
+       // The size of a message only measured by message body, so it's not 
accurate
+       PullThresholdSizeForQueue int
+
+       // Flow control threshold on topic level, default value is -1(Unlimited)
+       //
+       // The value of {@code pullThresholdForQueue} will be overwrote and 
calculated based on
+       // {@code pullThresholdForTopic} if it is't unlimited
+       //
+       // For example, if the value of pullThresholdForTopic is 1000 and 10 
message queues are assigned to this consumer,
+       // then pullThresholdForQueue will be set to 100
+       PullThresholdForTopic int
+
+       // Limit the cached message size on topic level, default value is -1 
MiB(Unlimited)
+       //
+       // The value of {@code pullThresholdSizeForQueue} will be overwrote and 
calculated based on
+       // {@code pullThresholdSizeForTopic} if it is't unlimited
+       //
+       // For example, if the value of pullThresholdSizeForTopic is 1000 MiB 
and 10 message queues are
+       // assigned to this consumer, then pullThresholdSizeForQueue will be 
set to 100 MiB
+       PullThresholdSizeForTopic int
+
+       // Message pull Interval
+       PullInterval time.Duration
+
+       // Batch consumption size
+       ConsumeMessageBatchMaxSize int
+
+       // Batch pull size
+       PullBatchSize int32
+
+       // Whether update subscription relationship when every pull
+       PostSubscriptionWhenPull bool
+
+       // Max re-consume times. -1 means 16 times.
+       //
+       // If messages are re-consumed more than {@link #maxReconsumeTimes} 
before success, it's be directed to a deletion
+       // queue waiting.
+       MaxReconsumeTimes int
+
+       // Suspending pulling time for cases requiring slow pulling like 
flow-control scenario.
+       SuspendCurrentQueueTimeMillis time.Duration
+
+       // Maximum amount of time a message may block the consuming thread.
+       ConsumeTimeout time.Duration
+
+       ConsumerModel  MessageModel
+       Strategy       AllocateStrategy
+       ConsumeOrderly bool
+       FromWhere      ConsumeFromWhere
+       // TODO traceDispatcher
+}
+
+func (opt *ClientOption) ChangeInstanceNameToPID() {
+       if opt.InstanceName == "DEFAULT" {
+               opt.InstanceName = strconv.Itoa(os.Getegid())
+       }
+}
+
+func (opt *ClientOption) String() string {
+       return fmt.Sprintf("ClientOption [ClientIP=%s, InstanceName=%s, "+
+               "UnitMode=%v, UnitName=%s, VIPChannelEnabled=%v, UseTLS=%v]", 
opt.ClientIP,
+               opt.InstanceName, opt.UnitMode, opt.UnitName, 
opt.VIPChannelEnabled, opt.UseTLS)
+}
+
+type ClientOption struct {
+       NameServerAddr    string
+       ClientIP          string
+       InstanceName      string
+       UnitMode          bool
+       UnitName          string
+       VIPChannelEnabled bool
+       UseTLS            bool
+}
diff --git a/kernel/model.go b/primitive/result.go
similarity index 71%
rename from kernel/model.go
rename to primitive/result.go
index 8ed0f97..0c1e738 100644
--- a/kernel/model.go
+++ b/primitive/result.go
@@ -15,14 +15,12 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
-package kernel
+package primitive
 
 import (
        "bytes"
        "encoding/binary"
-       "encoding/json"
        "fmt"
-       "github.com/apache/rocketmq-client-go/rlog"
        "github.com/apache/rocketmq-client-go/utils"
 )
 
@@ -209,85 +207,3 @@ func toMessages(messageExts []*MessageExt) []*Message {
 
        return msgs
 }
-
-// MessageQueue message queue
-type MessageQueue struct {
-       Topic      string `json:"topic"`
-       BrokerName string `json:"brokerName"`
-       QueueId    int    `json:"queueId"`
-}
-
-func (mq *MessageQueue) String() string {
-       return fmt.Sprintf("MessageQueue [topic=%s, brokerName=%s, 
queueId=%d]", mq.Topic, mq.BrokerName, mq.QueueId)
-}
-
-func (mq *MessageQueue) HashCode() int {
-       result := 1
-       result = 31*result + utils.HashString(mq.BrokerName)
-       result = 31*result + mq.QueueId
-       result = 31*result + utils.HashString(mq.Topic)
-
-       return result
-}
-
-func (mq *MessageQueue) Equals(queue *MessageQueue) bool {
-       // TODO
-       return true
-}
-
-type FindBrokerResult struct {
-       BrokerAddr    string
-       Slave         bool
-       BrokerVersion int32
-}
-
-type (
-       // groupName of consumer
-       producerData string
-
-       consumeType string
-
-       ServiceState int
-)
-
-const (
-       StateCreateJust ServiceState = iota
-       StateStartFailed
-       StateRunning
-       StateShutdown
-)
-
-type SubscriptionData struct {
-       ClassFilterMode bool
-       Topic           string
-       SubString       string
-       Tags            map[string]bool
-       Codes           map[int32]bool
-       SubVersion      int64
-       ExpType         string
-}
-
-type consumerData struct {
-       GroupName         string              `json:"groupName"`
-       CType             consumeType         `json:"consumeType"`
-       MessageModel      string              `json:"messageModel"`
-       Where             string              `json:"consumeFromWhere"`
-       SubscriptionDatas []*SubscriptionData `json:"subscriptionDataSet"`
-       UnitMode          bool                `json:"unitMode"`
-}
-
-type heartbeatData struct {
-       ClientId      string         `json:"clientID"`
-       ProducerDatas []producerData `json:"producerDataSet"`
-       ConsumerDatas []consumerData `json:"consumerDataSet"`
-}
-
-func (data *heartbeatData) encode() []byte {
-       d, err := json.Marshal(data)
-       if err != nil {
-               rlog.Errorf("marshal heartbeatData error: %s", err.Error())
-               return nil
-       }
-       rlog.Info(string(d))
-       return d
-}
diff --git a/primitive/strategy.go b/primitive/strategy.go
new file mode 100644
index 0000000..56dfef1
--- /dev/null
+++ b/primitive/strategy.go
@@ -0,0 +1,117 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package primitive
+
+import (
+       "github.com/apache/rocketmq-client-go/rlog"
+       "github.com/apache/rocketmq-client-go/utils"
+)
+
+// Strategy Algorithm for message allocating between consumers
+// An allocate strategy proxy for based on machine room nearside priority. An 
actual allocate strategy can be
+// specified.
+//
+// If any consumer is alive in a machine room, the message queue of the broker 
which is deployed in the same machine
+// should only be allocated to those. Otherwise, those message queues can be 
shared along all consumers since there are
+// no alive consumer to monopolize them.
+//
+// Average Hashing queue algorithm
+// Cycle average Hashing queue algorithm
+// Use Message Queue specified
+// Computer room Hashing queue algorithm, such as Alipay logic room
+// Consistent Hashing queue algorithm
+
+type AllocateStrategy func(string, string, []*MessageQueue, []string) 
[]*MessageQueue
+
+func AllocateByAveragely(consumerGroup, currentCID string, mqAll 
[]*MessageQueue,
+       cidAll []string) []*MessageQueue {
+       if currentCID == "" || utils.IsArrayEmpty(mqAll) || 
utils.IsArrayEmpty(cidAll) {
+               return nil
+       }
+       var (
+               find  bool
+               index int
+       )
+
+       for idx := range cidAll {
+               if cidAll[idx] == currentCID {
+                       find = true
+                       index = idx
+                       break
+               }
+       }
+       if !find {
+               rlog.Infof("[BUG] ConsumerGroup=%s, ConsumerId=%s not in 
cidAll:%+v", consumerGroup, currentCID, cidAll)
+               return nil
+       }
+
+       mqSize := len(mqAll)
+       cidSize := len(cidAll)
+       mod := mqSize % cidSize
+
+       var averageSize int
+       if mqSize <= cidSize {
+               averageSize = 1
+       } else {
+               if mod > 0 && index < mod {
+                       averageSize = mqSize/cidSize + 1
+               } else {
+                       averageSize = mqSize / cidSize
+               }
+       }
+
+       var startIndex int
+       if mod > 0 && index < mod {
+               startIndex = index * averageSize
+       } else {
+               startIndex = index*averageSize + mod
+       }
+
+       num := utils.MinInt(averageSize, mqSize-startIndex)
+       result := make([]*MessageQueue, num)
+       for i := 0; i < num; i++ {
+               result[i] = mqAll[(startIndex+i)%mqSize]
+       }
+       return result
+}
+
+// TODO
+func AllocateByMachineNearby(consumerGroup, currentCID string, mqAll 
[]*MessageQueue,
+       cidAll []string) []*MessageQueue {
+       return AllocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll 
[]*MessageQueue,
+       cidAll []string) []*MessageQueue {
+       return AllocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func AllocateByConfig(consumerGroup, currentCID string, mqAll []*MessageQueue,
+       cidAll []string) []*MessageQueue {
+       return AllocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func AllocateByMachineRoom(consumerGroup, currentCID string, mqAll 
[]*MessageQueue,
+       cidAll []string) []*MessageQueue {
+       return AllocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func AllocateByConsistentHash(consumerGroup, currentCID string, mqAll 
[]*MessageQueue,
+       cidAll []string) []*MessageQueue {
+       return AllocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}

Reply via email to