This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 0d4ed41f647eb5155c936f10f0f61113c44161b3
Author: Zijie Lu <[email protected]>
AuthorDate: Thu Jun 3 16:02:09 2021 +0800

    [INLONG-624]Go SDK consumer interface
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 .../{metadata/node.go => client/consumer.go}       |  51 ++-
 .../tubemq-client-go/client/consumer_impl.go       | 359 +++++++++++++++++++++
 .../tubemq-client-go/client/heartbeat.go           | 229 +++++++++++++
 .../tubemq-client-go/client/remote.go              |  79 -----
 .../tubemq-client-go/config/config.go              |  18 +-
 .../tubemq-client-go/config/config_test.go         |  10 +-
 tubemq-client-twins/tubemq-client-go/errs/errs.go  |  12 +-
 .../tubemq-client-go/metadata/consumer_event.go    |   8 +
 .../tubemq-client-go/metadata/metadata.go          |  20 ++
 .../tubemq-client-go/metadata/node.go              |  54 ++++
 .../tubemq-client-go/metadata/partition.go         |  42 ++-
 .../tubemq-client-go/metadata/subcribe_info.go     |  54 +++-
 .../tubemq-client-go/remote/remote.go              | 331 +++++++++++++++++++
 tubemq-client-twins/tubemq-client-go/rpc/broker.go |  32 +-
 tubemq-client-twins/tubemq-client-go/rpc/client.go |  23 +-
 tubemq-client-twins/tubemq-client-go/rpc/master.go |  19 +-
 .../tubemq-client-go/{client => sub}/info.go       |  77 ++++-
 .../tubemq-client-go/transport/client.go           |   8 +-
 tubemq-client-twins/tubemq-client-go/util/util.go  |  52 +++
 19 files changed, 1299 insertions(+), 179 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/metadata/node.go 
b/tubemq-client-twins/tubemq-client-go/client/consumer.go
similarity index 52%
copy from tubemq-client-twins/tubemq-client-go/metadata/node.go
copy to tubemq-client-twins/tubemq-client-go/client/consumer.go
index 625f278..27a8536 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/node.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -15,41 +15,32 @@
  * limitations under the License.
  */
 
-package metadata
+// Package client defines the api and information
+// which can be exposed to user.
+package client
 
-import (
-       "strconv"
+const (
+       tubeMQClientVersion = "0.1.0"
 )
 
-// Node represents the metadata of a node.
-type Node struct {
-       id      uint32
-       host    string
-       port    uint32
-       address string
+// ConsumerResult of a consumption.
+type ConsumerResult struct {
 }
 
-// GetID returns the id of a node.
-func (n *Node) GetID() uint32 {
-       return n.id
+// ConsumerOffset of a consumption,
+type ConsumerOffset struct {
 }
 
-// GetPort returns the port of a node.
-func (n *Node) GetPort() uint32 {
-       return n.port
-}
-
-// GetHost returns the hostname of a node.
-func (n *Node) GetHost() string {
-       return n.host
-}
-
-// GetAddress returns the address of a node.
-func (n *Node) GetAddress() string {
-       return n.address
-}
-
-// String returns the metadata of a node as a string.
-func (n *Node) String() string {
-       return strconv.Itoa(int(n.id)) + ":" + n.host + ":" + 
strconv.Itoa(int(n.port))
+var clientIndex uint64
+
+// Consumer is an interface that abstracts behavior of TubeMQ's consumer
+type Consumer interface {
+       // Start starts the consumer.
+       Start() error
+       // GetMessage receive a single message.
+       GetMessage() (*ConsumerResult, error)
+       // Confirm the consumption of a message.
+       Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
+       // GetCurrConsumedInfo returns the consumptions of the consumer.
+       GetCurrConsumedInfo() (map[string]*ConsumerOffset, error)
 }
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go 
b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
new file mode 100644
index 0000000..c0f7e9a
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+       "context"
+       "os"
+       "strconv"
+       "sync/atomic"
+       "time"
+
+       "github.com/golang/protobuf/proto"
+
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+const (
+       consumeStatusNormal        = 0
+       consumeStatusFromMax       = 1
+       consumeStatusFromMaxAlways = 2
+)
+
+type consumer struct {
+       clientID         string
+       config           *config.Config
+       subInfo          *sub.SubInfo
+       rmtDataCache     *remote.RmtDataCache
+       visitToken       int64
+       authorizedInfo   string
+       nextAuth2Master  int32
+       nextAuth2Broker  int32
+       master           *selector.Node
+       client           rpc.RPCClient
+       selector         selector.Selector
+       lastMasterHb     int64
+       masterHBRetry    int
+       heartbeatManager *heartbeatManager
+       unreportedTimes  int
+}
+
+// NewConsumer returns a consumer which is constructed by a given config.
+func NewConsumer(config *config.Config) (Consumer, error) {
+       selector, err := selector.Get("ip")
+       if err != nil {
+               return nil, err
+       }
+
+       clientID := newClientID(config.Consumer.Group)
+       pool := multiplexing.NewPool()
+       opts := &transport.Options{}
+       if config.Net.TLS.Enable {
+               opts.CACertFile = config.Net.TLS.CACertFile
+               opts.TLSCertFile = config.Net.TLS.TLSCertFile
+               opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+               opts.TLSServerName = config.Net.TLS.TLSServerName
+       }
+       client := rpc.New(pool, opts)
+       r := remote.NewRmtDataCache()
+       r.SetConsumerInfo(clientID, config.Consumer.Group)
+       c := &consumer{
+               config:          config,
+               clientID:        clientID,
+               subInfo:         sub.NewSubInfo(config),
+               rmtDataCache:    r,
+               selector:        selector,
+               client:          client,
+               visitToken:      util.InvalidValue,
+               unreportedTimes: 0,
+       }
+       c.subInfo.SetClientID(clientID)
+       hbm := newHBManager(c)
+       c.heartbeatManager = hbm
+       return c, nil
+}
+
+// Start implementation of tubeMQ consumer.
+func (c *consumer) Start() error {
+       err := c.register2Master(false)
+       if err != nil {
+               return err
+       }
+       c.heartbeatManager.registerMaster(c.master.Address)
+       go c.processRebalanceEvent()
+       return nil
+}
+
+func (c *consumer) register2Master(needChange bool) error {
+       if needChange {
+               node, err := c.selector.Select(c.config.Consumer.Masters)
+               if err != nil {
+                       return err
+               }
+               c.master = node
+       }
+       for c.master.HasNext {
+               ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
+
+               m := &metadata.Metadata{}
+               node := &metadata.Node{}
+               node.SetHost(util.GetLocalHost())
+               node.SetAddress(c.master.Address)
+               m.SetNode(node)
+               sub := &metadata.SubscribeInfo{}
+               sub.SetGroup(c.config.Consumer.Group)
+               m.SetSubscribeInfo(sub)
+
+               auth := &protocol.AuthenticateInfo{}
+               c.genMasterAuthenticateToken(auth, true)
+               mci := &protocol.MasterCertificateInfo{
+                       AuthInfo: auth,
+               }
+               c.subInfo.SetMasterCertificateInfo(mci)
+
+               rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, 
c.rmtDataCache)
+               if err != nil {
+                       cancel()
+                       return err
+               }
+               if rsp.GetSuccess() {
+                       c.masterHBRetry = 0
+                       c.processRegisterResponseM2C(rsp)
+                       cancel()
+                       return nil
+               } else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || 
rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+                       cancel()
+                       return nil
+               } else {
+                       c.master, err = 
c.selector.Select(c.config.Consumer.Masters)
+                       cancel()
+                       if err != nil {
+                               return err
+                       }
+               }
+       }
+       return nil
+}
+
+func (c *consumer) processRegisterResponseM2C(rsp 
*protocol.RegisterResponseM2C) {
+       if rsp.GetNotAllocated() {
+               c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+       }
+       if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 {
+               if rsp.GetDefFlowCheckId() != 0 {
+                       
c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), 
rsp.GetDefFlowControlInfo())
+               }
+               qryPriorityID := c.rmtDataCache.GetQryPriorityID()
+               if rsp.GetQryPriorityId() != 0 {
+                       qryPriorityID = rsp.GetQryPriorityId()
+               }
+               c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, 
rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+       }
+       if rsp.GetAuthorizedInfo() != nil {
+               c.processAuthorizedToken(rsp.GetAuthorizedInfo())
+       }
+       c.lastMasterHb = time.Now().UnixNano() / int64(time.Millisecond)
+}
+
+func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo) 
{
+       atomic.StoreInt64(&c.visitToken, info.GetVisitAuthorizedToken())
+       c.authorizedInfo = info.GetAuthAuthorizedToken()
+}
+
+// GetMessage implementation of TubeMQ consumer.
+func (c *consumer) GetMessage() (*ConsumerResult, error) {
+       panic("implement me")
+}
+
+// Confirm implementation of TubeMQ consumer.
+func (c *consumer) Confirm(confirmContext string, consumed bool) 
(*ConsumerResult, error) {
+       panic("implement me")
+}
+
+// GetCurrConsumedInfo implementation of TubeMQ consumer.
+func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
+       panic("implement me")
+}
+
+func (c *consumer) processRebalanceEvent() {
+       for {
+               event := c.rmtDataCache.TakeEvent()
+               if event == nil {
+                       continue
+               }
+               if event.GetEventStatus() == int32(util.InvalidValue) && 
event.GetRebalanceID() == util.InvalidValue {
+                       break
+               }
+               c.rmtDataCache.ClearEvent()
+               switch event.GetEventType() {
+               case 2, 20:
+                       c.disconnect2Broker(event)
+                       c.rmtDataCache.OfferEvent(event)
+               case 1, 10:
+                       c.connect2Broker(event)
+                       c.rmtDataCache.OfferEvent(event)
+               }
+       }
+}
+
+func (c *consumer) disconnect2Broker(event *metadata.ConsumerEvent) {
+       subscribeInfo := event.GetSubscribeInfo()
+       if len(subscribeInfo) > 0 {
+               removedPartitions := 
make(map[*metadata.Node][]*metadata.Partition)
+               c.rmtDataCache.RemoveAndGetPartition(subscribeInfo, 
c.config.Consumer.RollbackIfConfirmTimeout, removedPartitions)
+               if len(removedPartitions) > 0 {
+                       c.unregister2Broker(removedPartitions)
+               }
+       }
+       event.SetEventStatus(2)
+}
+
+func (c *consumer) unregister2Broker(unRegPartitions 
map[*metadata.Node][]*metadata.Partition) {
+       if len(unRegPartitions) == 0 {
+               return
+       }
+
+       for _, partitions := range unRegPartitions {
+               for _, partition := range partitions {
+                       ctx, cancel := 
context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+
+                       m := &metadata.Metadata{}
+                       node := &metadata.Node{}
+                       node.SetHost(util.GetLocalHost())
+                       node.SetAddress(partition.GetBroker().GetAddress())
+                       m.SetNode(node)
+                       m.SetReadStatus(1)
+                       sub := &metadata.SubscribeInfo{}
+                       sub.SetGroup(c.config.Consumer.Group)
+                       sub.SetConsumerID(c.clientID)
+                       sub.SetPartition(partition)
+                       m.SetSubscribeInfo(sub)
+
+                       c.client.UnregisterRequestC2B(ctx, m, c.subInfo)
+                       cancel()
+               }
+       }
+}
+
+func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
+       if len(event.GetSubscribeInfo()) > 0 {
+               unsubPartitions := 
c.rmtDataCache.FilterPartitions(event.GetSubscribeInfo())
+               if len(unsubPartitions) > 0 {
+                       for _, partition := range unsubPartitions {
+                               m := &metadata.Metadata{}
+                               node := &metadata.Node{}
+                               node.SetHost(util.GetLocalHost())
+                               
node.SetAddress(partition.GetBroker().GetAddress())
+                               m.SetNode(node)
+                               sub := &metadata.SubscribeInfo{}
+                               sub.SetGroup(c.config.Consumer.Group)
+                               sub.SetConsumerID(c.clientID)
+                               sub.SetPartition(partition)
+                               m.SetSubscribeInfo(sub)
+                               isFirstRegister := 
c.rmtDataCache.IsFirstRegister(partition.GetPartitionKey())
+                               
m.SetReadStatus(c.getConsumeReadStatus(isFirstRegister))
+                               auth := c.genBrokerAuthenticInfo(true)
+                               c.subInfo.SetAuthorizedInfo(auth)
+
+                               ctx, cancel := 
context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+                               rsp, err := c.client.RegisterRequestC2B(ctx, m, 
c.subInfo, c.rmtDataCache)
+                               if err != nil {
+                                       //todo add log
+                               }
+                               if rsp.GetSuccess() {
+                                       
c.rmtDataCache.AddNewPartition(partition)
+                                       c.heartbeatManager.registerBroker(node)
+                               }
+                               cancel()
+                       }
+               }
+       }
+       c.subInfo.FirstRegistered()
+       event.SetEventStatus(2)
+}
+
+func newClientIndex() uint64 {
+       return atomic.AddUint64(&clientIndex, 1)
+}
+
+func newClientID(group string) string {
+       return group + "_" +
+               util.GetLocalHost() + "_" +
+               strconv.Itoa(os.Getpid()) + "_" +
+               strconv.Itoa(int(time.Now().Unix()*1000)) + "_" +
+               strconv.Itoa(int(newClientIndex())) + "_" +
+               tubeMQClientVersion
+}
+
+func (c *consumer) genBrokerAuthenticInfo(force bool) *protocol.AuthorizedInfo 
{
+       needAdd := false
+       auth := &protocol.AuthorizedInfo{}
+       if c.config.Net.Auth.Enable {
+               if force {
+                       needAdd = true
+                       atomic.StoreInt32(&c.nextAuth2Broker, 0)
+               } else if atomic.LoadInt32(&c.nextAuth2Broker) == 1 {
+                       if atomic.CompareAndSwapInt32(&c.nextAuth2Broker, 1, 0) 
{
+                               needAdd = true
+                       }
+               }
+               if needAdd {
+                       authToken := 
util.GenBrokerAuthenticateToken(c.config.Net.Auth.UserName, 
c.config.Net.Auth.Password)
+                       auth.AuthAuthorizedToken = proto.String(authToken)
+               }
+       }
+       return auth
+}
+
+func (c *consumer) genMasterAuthenticateToken(auth *protocol.AuthenticateInfo, 
force bool) {
+       needAdd := false
+       if c.config.Net.Auth.Enable {
+               if force {
+                       needAdd = true
+                       atomic.StoreInt32(&c.nextAuth2Master, 0)
+               } else if atomic.LoadInt32(&c.nextAuth2Master) == 1 {
+                       if atomic.CompareAndSwapInt32(&c.nextAuth2Master, 1, 0) 
{
+                               needAdd = true
+                       }
+               }
+               if needAdd {
+               }
+       }
+}
+
+func (c *consumer) getConsumeReadStatus(isFirstReg bool) int32 {
+       readStatus := consumeStatusNormal
+       if isFirstReg {
+               if c.config.Consumer.ConsumePosition == 0 {
+                       readStatus = consumeStatusFromMax
+               } else if c.config.Consumer.ConsumePosition > 0 {
+                       readStatus = consumeStatusFromMaxAlways
+               }
+       }
+       return int32(readStatus)
+}
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go 
b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
new file mode 100644
index 0000000..9c29854
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+       "context"
+       "strings"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+type heartbeatMetadata struct {
+       numConnections int
+       timer          *time.Timer
+}
+
+type heartbeatManager struct {
+       consumer   *consumer
+       heartbeats map[string]*heartbeatMetadata
+       mu         sync.Mutex
+}
+
+func newHBManager(consumer *consumer) *heartbeatManager {
+       return &heartbeatManager{
+               consumer:   consumer,
+               heartbeats: make(map[string]*heartbeatMetadata),
+       }
+}
+
+func (h *heartbeatManager) registerMaster(address string) {
+       h.mu.Lock()
+       defer h.mu.Unlock()
+       if _, ok := h.heartbeats[address]; !ok {
+               h.heartbeats[address] = &heartbeatMetadata{
+                       numConnections: 1,
+                       timer:          
time.AfterFunc(h.consumer.config.Heartbeat.Interval/2, h.consumerHB2Master),
+               }
+       }
+       hm := h.heartbeats[address]
+       hm.numConnections++
+}
+
+func (h *heartbeatManager) registerBroker(broker *metadata.Node) {
+       h.mu.Lock()
+       defer h.mu.Unlock()
+
+       if _, ok := h.heartbeats[broker.GetAddress()]; !ok {
+               h.heartbeats[broker.GetAddress()] = &heartbeatMetadata{
+                       numConnections: 1,
+                       timer:          
time.AfterFunc(h.consumer.config.Heartbeat.Interval, func() { 
h.consumerHB2Broker(broker) }),
+               }
+       }
+       hm := h.heartbeats[broker.GetAddress()]
+       hm.numConnections++
+}
+
+func (h *heartbeatManager) consumerHB2Master() {
+       if 
time.Now().UnixNano()/int64(time.Millisecond)-h.consumer.lastMasterHb > 30000 {
+               
h.consumer.rmtDataCache.HandleExpiredPartitions(h.consumer.config.Consumer.MaxConfirmWait)
+       }
+       m := &metadata.Metadata{}
+       node := &metadata.Node{}
+       node.SetHost(util.GetLocalHost())
+       node.SetAddress(h.consumer.master.Address)
+       m.SetNode(node)
+       sub := &metadata.SubscribeInfo{}
+       sub.SetGroup(h.consumer.config.Consumer.Group)
+       m.SetSubscribeInfo(sub)
+       h.consumer.unreportedTimes++
+       if h.consumer.unreportedTimes > 
h.consumer.config.Consumer.MaxSubInfoReportInterval {
+               m.SetReportTimes(true)
+               h.consumer.unreportedTimes = 0
+       }
+
+       retry := 0
+       for retry < h.consumer.config.Heartbeat.MaxRetryTimes {
+               ctx, cancel := context.WithTimeout(context.Background(), 
h.consumer.config.Net.ReadTimeout)
+               rsp, err := h.consumer.client.HeartRequestC2M(ctx, m, 
h.consumer.subInfo, h.consumer.rmtDataCache)
+               if err != nil {
+                       cancel()
+               }
+               if rsp.GetSuccess() {
+                       cancel()
+                       h.processHBResponseM2C(rsp)
+                       break
+               } else if rsp.GetErrCode() == errs.RetErrHBNoNode || 
strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
+                       cancel()
+                       h.consumer.masterHBRetry++
+                       address := h.consumer.master.Address
+                       go h.consumer.register2Master(rsp.GetErrCode() != 
errs.RetErrHBNoNode)
+                       if rsp.GetErrCode() != errs.RetErrHBNoNode {
+                               hm := h.heartbeats[address]
+                               hm.numConnections--
+                               if hm.numConnections == 0 {
+                                       h.mu.Lock()
+                                       delete(h.heartbeats, address)
+                                       h.mu.Unlock()
+                               }
+                       }
+                       return
+               }
+               cancel()
+       }
+       h.mu.Lock()
+       defer h.mu.Unlock()
+       hm := h.heartbeats[h.consumer.master.Address]
+       hm.timer.Reset(h.nextHeartbeatInterval())
+}
+
+func (h *heartbeatManager) processHBResponseM2C(rsp 
*protocol.HeartResponseM2C) {
+       h.consumer.masterHBRetry = 0
+       h.consumer.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+       if rsp.GetDefFlowCheckId() != 0 || rsp.GetGroupFlowCheckId() != 0 {
+               if rsp.GetDefFlowCheckId() != 0 {
+                       
h.consumer.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), 
rsp.GetDefFlowControlInfo())
+               }
+               qryPriorityID := h.consumer.rmtDataCache.GetQryPriorityID()
+               if rsp.GetQryPriorityId() != 0 {
+                       qryPriorityID = rsp.GetQryPriorityId()
+               }
+               h.consumer.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, 
rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+       }
+       if rsp.GetAuthorizedInfo() != nil {
+               h.consumer.processAuthorizedToken(rsp.GetAuthorizedInfo())
+       }
+       if rsp.GetRequireAuth() {
+               atomic.StoreInt32(&h.consumer.nextAuth2Master, 1)
+       }
+       if rsp.GetEvent() != nil {
+               event := rsp.GetEvent()
+               subscribeInfo := make([]*metadata.SubscribeInfo, 0, 
len(event.GetSubscribeInfo()))
+               for _, sub := range event.GetSubscribeInfo() {
+                       s, err := metadata.NewSubscribeInfo(sub)
+                       if err != nil {
+                               continue
+                       }
+                       subscribeInfo = append(subscribeInfo, s)
+               }
+               e := metadata.NewEvent(event.GetRebalanceId(), 
event.GetOpType(), subscribeInfo)
+               h.consumer.rmtDataCache.OfferEvent(e)
+       }
+}
+
+func (h *heartbeatManager) nextHeartbeatInterval() time.Duration {
+       interval := h.consumer.config.Heartbeat.Interval
+       if h.consumer.masterHBRetry >= 
h.consumer.config.Heartbeat.MaxRetryTimes {
+               interval = h.consumer.config.Heartbeat.AfterFail
+       }
+       return interval
+}
+
+func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) {
+       h.mu.Lock()
+       defer h.mu.Unlock()
+
+       partitions := h.consumer.rmtDataCache.GetPartitionByBroker(broker)
+       if len(partitions) == 0 {
+               h.resetBrokerTimer(broker)
+               return
+       }
+       m := &metadata.Metadata{}
+       m.SetReadStatus(h.consumer.getConsumeReadStatus(false))
+       m.SetNode(broker)
+       ctx, cancel := context.WithTimeout(context.Background(), 
h.consumer.config.Net.ReadTimeout)
+       defer cancel()
+
+       rsp, err := h.consumer.client.HeartbeatRequestC2B(ctx, m, 
h.consumer.subInfo, h.consumer.rmtDataCache)
+       if err != nil {
+               return
+       }
+       if rsp.GetSuccess() {
+               if rsp.GetHasPartFailure() {
+                       partitionKeys := make([]string, 0, 
len(rsp.GetFailureInfo()))
+                       for _, fi := range rsp.GetFailureInfo() {
+                               pos := strings.Index(fi, ":")
+                               if pos == -1 {
+                                       continue
+                               }
+                               partition, err := 
metadata.NewPartition(fi[pos+1:])
+                               if err != nil {
+                                       continue
+                               }
+                               partitionKeys = append(partitionKeys, 
partition.GetPartitionKey())
+                       }
+                       h.consumer.rmtDataCache.RemovePartition(partitionKeys)
+               } else {
+                       if rsp.GetErrCode() == errs.RetCertificateFailure {
+                               partitionKeys := make([]string, 0, 
len(partitions))
+                               for _, partition := range partitions {
+                                       partitionKeys = append(partitionKeys, 
partition.GetPartitionKey())
+                               }
+                               
h.consumer.rmtDataCache.RemovePartition(partitionKeys)
+                       }
+               }
+       }
+       h.resetBrokerTimer(broker)
+}
+
+func (h *heartbeatManager) resetBrokerTimer(broker *metadata.Node) {
+       interval := h.consumer.config.Heartbeat.Interval
+       partitions := h.consumer.rmtDataCache.GetPartitionByBroker(broker)
+       if len(partitions) == 0 {
+               delete(h.heartbeats, broker.GetAddress())
+       } else {
+               hm := h.heartbeats[broker.GetAddress()]
+               hm.timer.Reset(interval)
+       }
+}
diff --git a/tubemq-client-twins/tubemq-client-go/client/remote.go 
b/tubemq-client-twins/tubemq-client-go/client/remote.go
deleted file mode 100644
index e36b69f..0000000
--- a/tubemq-client-twins/tubemq-client-go/client/remote.go
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// Package client defines the api and information
-// which can be exposed to user.
-package client
-
-import (
-       "sync"
-
-       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
-)
-
-// RmtDataCache represents the data returned from TubeMQ.
-type RmtDataCache struct {
-       consumerID         string
-       groupName          string
-       underGroupCtrl     bool
-       defFlowCtrlID      int64
-       groupFlowCtrlID    int64
-       subscribeInfo      []*metadata.SubscribeInfo
-       rebalanceResults   []*metadata.ConsumerEvent
-       mu                 sync.Mutex
-       brokerToPartitions map[*metadata.Node][]*metadata.Partition
-}
-
-// GetUnderGroupCtrl returns the underGroupCtrl.
-func (r *RmtDataCache) GetUnderGroupCtrl() bool {
-       return r.underGroupCtrl
-}
-
-// GetDefFlowCtrlID returns the defFlowCtrlID.
-func (r *RmtDataCache) GetDefFlowCtrlID() int64 {
-       return r.defFlowCtrlID
-}
-
-// GetGroupFlowCtrlID returns the groupFlowCtrlID.
-func (r *RmtDataCache) GetGroupFlowCtrlID() int64 {
-       return r.groupFlowCtrlID
-}
-
-// GetSubscribeInfo returns the subscribeInfo.
-func (r *RmtDataCache) GetSubscribeInfo() []*metadata.SubscribeInfo {
-       return r.subscribeInfo
-}
-
-// PollEventResult polls the first event result from the rebalanceResults.
-func (r *RmtDataCache) PollEventResult() *metadata.ConsumerEvent {
-       r.mu.Lock()
-       defer r.mu.Unlock()
-       if len(r.rebalanceResults) > 0 {
-               event := r.rebalanceResults[0]
-               r.rebalanceResults = r.rebalanceResults[1:]
-               return event
-       }
-       return nil
-}
-
-// GetPartitionByBroker returns the
-func (r *RmtDataCache) GetPartitionByBroker(node *metadata.Node) 
[]*metadata.Partition {
-       if partitions, ok := r.brokerToPartitions[node]; ok {
-               return partitions
-       }
-       return nil
-}
diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go 
b/tubemq-client-twins/tubemq-client-go/config/config.go
index 47a360a..7a93831 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config.go
@@ -29,7 +29,7 @@ import (
 // Config defines multiple configuration options.
 // Refer to: 
https://github.com/apache/incubator-inlong/blob/3249de37acf054a9c43677131cfbb09fc6d366d1/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
 type Config struct {
-       // Net is the namespace for network-level properties used by Broker and 
Master.
+       // Net is the namespace for network-level properties used by broker and 
Master.
        Net struct {
                // ReadTimeout represents how long to wait for a response.
                ReadTimeout time.Duration
@@ -61,9 +61,13 @@ type Config struct {
        // used by the consumer
        Consumer struct {
                // Masters is the addresses of master.
-               Masters []string
-               // Topic of the consumption.
-               Topic string
+               Masters string
+               // Topics of the consumption.
+               Topics []string
+               // TopicFilters is the map of topic to filters.
+               TopicFilters map[string][]string
+               // PartitionOffset is the map of partition to its corresponding 
offset.
+               PartitionOffset map[string]int64
                // ConsumerPosition is the initial offset to use if no offset 
was previously committed.
                ConsumePosition int
                // Group is the consumer group name.
@@ -152,7 +156,7 @@ func ParseAddress(address string) (config *Config, err 
error) {
                return nil, fmt.Errorf("address format invalid: address: %v, 
token: %v", address, tokens)
        }
 
-       c.Consumer.Masters = strings.Split(tokens[0], ",")
+       c.Consumer.Masters = tokens[0]
 
        tokens = strings.Split(tokens[1], "&")
        if len(tokens) == 0 {
@@ -190,8 +194,8 @@ func getConfigFromToken(config *Config, values []string) 
error {
                config.Net.TLS.TLSServerName = values[1]
        case "group":
                config.Consumer.Group = values[1]
-       case "topic":
-               config.Consumer.Topic = values[1]
+       case "topics":
+               config.Consumer.Topics = strings.Split(values[1], ",")
        case "consumePosition":
                config.Consumer.ConsumePosition, err = strconv.Atoi(values[1])
        case "boundConsume":
diff --git a/tubemq-client-twins/tubemq-client-go/config/config_test.go 
b/tubemq-client-twins/tubemq-client-go/config/config_test.go
index 56bc600..a8ac703 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config_test.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config_test.go
@@ -25,11 +25,11 @@ import (
 )
 
 func TestParseAddress(t *testing.T) {
-       address := 
"127.0.0.1:9092,127.0.0.1:9093?topic=Topic&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6"
+       address := 
"127.0.0.1:9092,127.0.0.1:9093?topics=Topic1,Topic2&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6"
        c, err := ParseAddress(address)
        assert.Nil(t, err)
-       assert.Equal(t, c.Consumer.Masters, []string{"127.0.0.1:9092", 
"127.0.0.1:9093"})
-       assert.Equal(t, c.Consumer.Topic, "Topic")
+       assert.Equal(t, c.Consumer.Masters, "127.0.0.1:9092,127.0.0.1:9093")
+       assert.Equal(t, c.Consumer.Topics, []string{"Topic1", "Topic2"})
        assert.Equal(t, c.Consumer.Group, "Group")
        assert.Equal(t, c.Consumer.MsgNotFoundWait, 10000*time.Millisecond)
 
@@ -41,11 +41,11 @@ func TestParseAddress(t *testing.T) {
        _, err = ParseAddress(address)
        assert.NotNil(t, err)
 
-       address = "127.0.0.1:9092,127.0.0.1:9093?Topic=Topic&ttt"
+       address = "127.0.0.1:9092,127.0.0.1:9093?topics=Topic&ttt"
        _, err = ParseAddress(address)
        assert.NotNil(t, err)
 
-       address = "127.0.0.1:9092,127.0.0.1:9093?Topic=Topic&ttt=ttt"
+       address = "127.0.0.1:9092,127.0.0.1:9093?topics=Topic&ttt=ttt"
        _, err = ParseAddress(address)
        assert.NotNil(t, err)
 }
diff --git a/tubemq-client-twins/tubemq-client-go/errs/errs.go 
b/tubemq-client-twins/tubemq-client-go/errs/errs.go
index 00b3c81..eb45179 100644
--- a/tubemq-client-twins/tubemq-client-go/errs/errs.go
+++ b/tubemq-client-twins/tubemq-client-go/errs/errs.go
@@ -33,12 +33,18 @@ const (
        RetAssertionFailure = 4
        // RetRequestFailure represents the error code of request error.
        RetRequestFailure = 5
-       // RetSelectorNotExist = 6
-       RetSelectorNotExist = 6
+       // RetSelectorNotExist represents the selector not exists.
+       RetSelectorNotExist        = 6
+       RetErrHBNoNode             = 411
+       RetCertificateFailure      = 415
+       RetConsumeGroupForbidden   = 450
+       RetConsumeContentForbidden = 455
 )
 
 // ErrAssertionFailure represents RetAssertionFailure error.
-var ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure")
+var (
+       ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure")
+)
 
 // Error provides a TubeMQ-specific error container
 type Error struct {
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go 
b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
index 2b4ce49..6ce2915 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
@@ -25,6 +25,14 @@ type ConsumerEvent struct {
        subscribeInfo []*SubscribeInfo
 }
 
+func NewEvent(rebalanceID int64, eventType int32, subscribeInfo 
[]*SubscribeInfo) *ConsumerEvent {
+       return &ConsumerEvent{
+               rebalanceID:   rebalanceID,
+               eventType:     eventType,
+               subscribeInfo: subscribeInfo,
+       }
+}
+
 // GetRebalanceID returns the rebalanceID of a consumer event.
 func (c *ConsumerEvent) GetRebalanceID() int64 {
        return c.rebalanceID
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/metadata.go 
b/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
index 861a06e..f64674a 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
@@ -45,3 +45,23 @@ func (m *Metadata) GetReadStatus() int32 {
 func (m *Metadata) GetReportTimes() bool {
        return m.reportTimes
 }
+
+// SetNode sets the node.
+func (m *Metadata) SetNode(node *Node) {
+       m.node = node
+}
+
+// SetSubscribeInfo sets the subscribeInfo.
+func (m *Metadata) SetSubscribeInfo(sub *SubscribeInfo) {
+       m.subscribeInfo = sub
+}
+
+// ReadStatus sets the status.
+func (m *Metadata) SetReadStatus(status int32) {
+       m.readStatus = status
+}
+
+// SetReportTimes sets the reportTimes.
+func (m *Metadata) SetReportTimes(reportTimes bool) {
+       m.reportTimes = reportTimes
+}
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/node.go 
b/tubemq-client-twins/tubemq-client-go/metadata/node.go
index 625f278..bcfda82 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/node.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/node.go
@@ -19,6 +19,7 @@ package metadata
 
 import (
        "strconv"
+       "strings"
 )
 
 // Node represents the metadata of a node.
@@ -29,6 +30,43 @@ type Node struct {
        address string
 }
 
+// NewNode constructs a node from a given string.
+// If the given string is invalid, it will return error.
+func NewNode(isBroker bool, node string) (*Node, error) {
+       res := strings.Split(node, ":")
+       nodeID := 0
+       host := ""
+       port := 8123
+       var err error
+       if isBroker {
+               nodeID, err = strconv.Atoi(res[0])
+               if err != nil {
+                       return nil, err
+               }
+               host = res[1]
+               if len(res) >= 3 {
+                       port, err = strconv.Atoi(res[2])
+                       if err != nil {
+                               return nil, err
+                       }
+               }
+       } else {
+               host = res[0]
+               if len(res) >= 2 {
+                       port, err = strconv.Atoi(res[1])
+                       if err != nil {
+                               return nil, err
+                       }
+               }
+       }
+       return &Node{
+               id:      uint32(nodeID),
+               host:    host,
+               port:    uint32(port),
+               address: host + ":" + strconv.Itoa(port),
+       }, nil
+}
+
 // GetID returns the id of a node.
 func (n *Node) GetID() uint32 {
        return n.id
@@ -53,3 +91,19 @@ func (n *Node) GetAddress() string {
 func (n *Node) String() string {
        return strconv.Itoa(int(n.id)) + ":" + n.host + ":" + 
strconv.Itoa(int(n.port))
 }
+
+// SetHost sets the host.
+func (n *Node) SetHost(host string) {
+       n.host = host
+}
+
+// SetAddress sets the address.
+func (n *Node) SetAddress(address string) error {
+       port, err := strconv.Atoi(strings.Split(address, ":")[1])
+       if err != nil {
+               return err
+       }
+       n.address = address
+       n.port = uint32(port)
+       return nil
+}
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go 
b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index a180214..fbe1086 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -19,18 +19,48 @@ package metadata
 
 import (
        "strconv"
+       "strings"
 )
 
 // Partition represents the metadata of a partition.
 type Partition struct {
        topic        string
-       Broker       *Node
+       broker       *Node
        partitionID  int32
        partitionKey string
        offset       int64
        lastConsumed bool
 }
 
+func NewPartition(partition string) (*Partition, error) {
+       var b *Node
+       var topic string
+       var partitionID int
+       var err error
+       pos := strings.Index(partition, "#")
+       if pos != -1 {
+               broker := partition[:pos]
+               b, err = NewNode(true, broker)
+               if err != nil {
+                       return nil, err
+               }
+               p := partition[pos+1:]
+               pos = strings.Index(p, ":")
+               if pos != -1 {
+                       topic = p[0:pos]
+                       partitionID, err = strconv.Atoi(p[pos+1:])
+                       if err != nil {
+                               return nil, err
+                       }
+               }
+       }
+       return &Partition{
+               topic:       topic,
+               broker:      b,
+               partitionID: int32(partitionID),
+       }, nil
+}
+
 // GetLastConsumed returns lastConsumed of a partition.
 func (p *Partition) GetLastConsumed() bool {
        return p.lastConsumed
@@ -51,7 +81,15 @@ func (p *Partition) GetTopic() string {
        return p.topic
 }
 
+func (p *Partition) GetBroker() *Node {
+       return p.broker
+}
+
 // String returns the metadata of a Partition as a string.
 func (p *Partition) String() string {
-       return p.Broker.String() + "#" + p.topic + "@" + 
strconv.Itoa(int(p.partitionID))
+       return p.broker.String() + "#" + p.topic + "@" + 
strconv.Itoa(int(p.partitionID))
+}
+
+func (p *Partition) SetLastConsumed(lastConsumed bool) {
+       p.lastConsumed = lastConsumed
 }
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go 
b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
index d76437a..90ae3ef 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
@@ -19,14 +19,14 @@ package metadata
 
 import (
        "fmt"
+       "strings"
 )
 
 // SubscribeInfo represents the metadata of the subscribe info.
 type SubscribeInfo struct {
-       group         string
-       consumerID    string
-       partition     *Partition
-       qryPriorityID int32
+       group      string
+       consumerID string
+       partition  *Partition
 }
 
 // GetGroup returns the group name.
@@ -44,12 +44,48 @@ func (s *SubscribeInfo) GetPartition() *Partition {
        return s.partition
 }
 
-// GetQryPriorityID returns the QryPriorityID.
-func (s *SubscribeInfo) GetQryPriorityID() int32 {
-       return s.qryPriorityID
-}
-
 // String returns the contents of SubscribeInfo as a string.
 func (s *SubscribeInfo) String() string {
        return fmt.Sprintf("%s@%s-%s", s.consumerID, s.group, 
s.partition.String())
 }
+
+// NewSubscribeInfo constructs a SubscribeInfo from a given string.
+// If the given is invalid, it will return error.
+func NewSubscribeInfo(subscribeInfo string) (*SubscribeInfo, error) {
+       consumerID := ""
+       group := ""
+       var partition *Partition
+       var err error
+       pos := strings.Index(subscribeInfo, "#")
+       if pos != -1 {
+               consumerInfo := subscribeInfo[:pos]
+               partitionInfo := subscribeInfo[pos+1:]
+               partition, err = NewPartition(partitionInfo)
+               if err != nil {
+                       return nil, err
+               }
+               pos = strings.Index(consumerInfo, "@")
+               consumerID = consumerInfo[:pos]
+               group = consumerInfo[pos+1:]
+       }
+       return &SubscribeInfo{
+               group:      group,
+               consumerID: consumerID,
+               partition:  partition,
+       }, nil
+}
+
+// SetPartition sets the partition.
+func (s *SubscribeInfo) SetPartition(partition *Partition) {
+       s.partition = partition
+}
+
+// SetGroup sets the group.
+func (s *SubscribeInfo) SetGroup(group string) {
+       s.group = group
+}
+
+// SetConsumerID sets the consumerID.
+func (s *SubscribeInfo) SetConsumerID(id string) {
+       s.consumerID = id
+}
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go 
b/tubemq-client-twins/tubemq-client-go/remote/remote.go
new file mode 100644
index 0000000..aa9a2d1
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// package remote defines the remote data which is returned from TubeMQ.
+package remote
+
+import (
+       "sync"
+       "time"
+
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+// RmtDataCache represents the data returned from TubeMQ.
+type RmtDataCache struct {
+       consumerID         string
+       groupName          string
+       underGroupCtrl     bool
+       defFlowCtrlID      int64
+       groupFlowCtrlID    int64
+       partitionSubInfo   map[string]*metadata.SubscribeInfo
+       rebalanceResults   []*metadata.ConsumerEvent
+       eventMu            sync.Mutex
+       metaMu             sync.Mutex
+       dataBookMu         sync.Mutex
+       brokerPartitions   map[*metadata.Node]map[string]bool
+       qryPriorityID      int32
+       partitions         map[string]*metadata.Partition
+       usedPartitions     map[string]int64
+       indexPartitions    map[string]bool
+       partitionTimeouts  map[string]*time.Timer
+       topicPartitions    map[string]map[string]bool
+       partitionRegBooked map[string]bool
+}
+
+// NewRmtDataCache returns a default rmtDataCache.
+func NewRmtDataCache() *RmtDataCache {
+       return &RmtDataCache{
+               defFlowCtrlID:      util.InvalidValue,
+               groupFlowCtrlID:    util.InvalidValue,
+               qryPriorityID:      int32(util.InvalidValue),
+               partitionSubInfo:   make(map[string]*metadata.SubscribeInfo),
+               rebalanceResults:   make([]*metadata.ConsumerEvent, 0, 0),
+               brokerPartitions:   make(map[*metadata.Node]map[string]bool),
+               partitions:         make(map[string]*metadata.Partition),
+               usedPartitions:     make(map[string]int64),
+               indexPartitions:    make(map[string]bool),
+               partitionTimeouts:  make(map[string]*time.Timer),
+               topicPartitions:    make(map[string]map[string]bool),
+               partitionRegBooked: make(map[string]bool),
+       }
+}
+
+// GetUnderGroupCtrl returns the underGroupCtrl.
+func (r *RmtDataCache) GetUnderGroupCtrl() bool {
+       return r.underGroupCtrl
+}
+
+// GetDefFlowCtrlID returns the defFlowCtrlID.
+func (r *RmtDataCache) GetDefFlowCtrlID() int64 {
+       return r.defFlowCtrlID
+}
+
+// GetGroupFlowCtrlID returns the groupFlowCtrlID.
+func (r *RmtDataCache) GetGroupFlowCtrlID() int64 {
+       return r.groupFlowCtrlID
+}
+
+// GetGroupName returns the group name.
+func (r *RmtDataCache) GetGroupName() string {
+       return r.groupName
+}
+
+// GetSubscribeInfo returns the partitionSubInfo.
+func (r *RmtDataCache) GetSubscribeInfo() []*metadata.SubscribeInfo {
+       r.metaMu.Lock()
+       defer r.metaMu.Unlock()
+       subInfos := make([]*metadata.SubscribeInfo, 0, len(r.partitionSubInfo))
+       for _, sub := range r.partitionSubInfo {
+               subInfos = append(subInfos, sub)
+       }
+       return subInfos
+}
+
+// GetQryPriorityID returns the QryPriorityID.
+func (r *RmtDataCache) GetQryPriorityID() int32 {
+       return r.qryPriorityID
+}
+
+// PollEventResult polls the first event result from the rebalanceResults.
+func (r *RmtDataCache) PollEventResult() *metadata.ConsumerEvent {
+       r.eventMu.Lock()
+       defer r.eventMu.Unlock()
+       if len(r.rebalanceResults) > 0 {
+               event := r.rebalanceResults[0]
+               r.rebalanceResults = r.rebalanceResults[1:]
+               return event
+       }
+       return nil
+}
+
+// GetPartitionByBroker returns the subscribed partitions of the given broker.
+func (r *RmtDataCache) GetPartitionByBroker(broker *metadata.Node) 
[]*metadata.Partition {
+       r.metaMu.Lock()
+       defer r.metaMu.Unlock()
+
+       if partitionMap, ok := r.brokerPartitions[broker]; ok {
+               partitions := make([]*metadata.Partition, 0, len(partitionMap))
+               for partition := range partitionMap {
+                       partitions = append(partitions, r.partitions[partition])
+               }
+               return partitions
+       }
+       return nil
+}
+
+// SetConsumerInfo sets the consumer information including consumerID and 
groupName.
+func (r *RmtDataCache) SetConsumerInfo(consumerID string, group string) {
+       r.consumerID = consumerID
+       r.groupName = group
+}
+
+// UpdateDefFlowCtrlInfo updates the defFlowCtrlInfo.
+func (r *RmtDataCache) UpdateDefFlowCtrlInfo(flowCtrlID int64, flowCtrlInfo 
string) {
+
+}
+
+// UpdateGroupFlowCtrlInfo updates the groupFlowCtrlInfo.
+func (r *RmtDataCache) UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID 
int64, flowCtrlInfo string) {
+
+}
+
+// OfferEvent offers an consumer event.
+func (r *RmtDataCache) OfferEvent(event *metadata.ConsumerEvent) {
+       r.eventMu.Lock()
+       defer r.eventMu.Unlock()
+       r.rebalanceResults = append(r.rebalanceResults, event)
+}
+
+// TakeEvent takes an event from the rebalanceResults.
+func (r *RmtDataCache) TakeEvent() *metadata.ConsumerEvent {
+       r.eventMu.Lock()
+       defer r.eventMu.Unlock()
+       if len(r.rebalanceResults) == 0 {
+               return nil
+       }
+       event := r.rebalanceResults[0]
+       r.rebalanceResults = r.rebalanceResults[1:]
+       return event
+}
+
+// ClearEvent clears all the events.
+func (r *RmtDataCache) ClearEvent() {
+       r.eventMu.Lock()
+       defer r.eventMu.Unlock()
+       r.rebalanceResults = r.rebalanceResults[:0]
+}
+
+// RemoveAndGetPartition removes the given partitions.
+func (r *RmtDataCache) RemoveAndGetPartition(subscribeInfos 
[]*metadata.SubscribeInfo, processingRollback bool, partitions 
map[*metadata.Node][]*metadata.Partition) {
+       if len(subscribeInfos) == 0 {
+               return
+       }
+       r.metaMu.Lock()
+       defer r.metaMu.Unlock()
+       for _, sub := range subscribeInfos {
+               partitionKey := sub.GetPartition().GetPartitionKey()
+               if partition, ok := r.partitions[partitionKey]; ok {
+                       if _, ok := r.usedPartitions[partitionKey]; ok {
+                               if processingRollback {
+                                       partition.SetLastConsumed(false)
+                               } else {
+                                       partition.SetLastConsumed(true)
+                               }
+                       }
+                       if _, ok := partitions[partition.GetBroker()]; !ok {
+                               partitions[partition.GetBroker()] = 
[]*metadata.Partition{partition}
+                       } else {
+                               partitions[partition.GetBroker()] = 
append(partitions[partition.GetBroker()], partition)
+                       }
+                       r.removeMetaInfo(partitionKey)
+               }
+               r.resetIdlePartition(partitionKey, false)
+       }
+}
+
+func (r *RmtDataCache) removeMetaInfo(partitionKey string) {
+       if partition, ok := r.partitions[partitionKey]; ok {
+               if partitions, ok := r.topicPartitions[partition.GetTopic()]; 
ok {
+                       delete(partitions, partitionKey)
+                       if len(partitions) == 0 {
+                               delete(r.topicPartitions, partition.GetTopic())
+                       }
+               }
+               if partitions, ok := r.brokerPartitions[partition.GetBroker()]; 
ok {
+                       delete(partitions, partition.GetPartitionKey())
+                       if len(partitions) == 0 {
+                               delete(r.brokerPartitions, 
partition.GetBroker())
+                       }
+               }
+               delete(r.partitions, partitionKey)
+               delete(r.partitionSubInfo, partitionKey)
+       }
+}
+
+func (r *RmtDataCache) resetIdlePartition(partitionKey string, reuse bool) {
+       delete(r.usedPartitions, partitionKey)
+       if timer, ok := r.partitionTimeouts[partitionKey]; ok {
+               if !timer.Stop() {
+                       <-timer.C
+               }
+               timer.Stop()
+               delete(r.partitionTimeouts, partitionKey)
+       }
+       delete(r.indexPartitions, partitionKey)
+       if reuse {
+               if _, ok := r.partitions[partitionKey]; ok {
+                       r.indexPartitions[partitionKey] = true
+               }
+       }
+}
+
+// FilterPartitions returns the unsubscribed partitions.
+func (r *RmtDataCache) FilterPartitions(subInfos []*metadata.SubscribeInfo) 
[]*metadata.Partition {
+       r.metaMu.Lock()
+       defer r.metaMu.Unlock()
+       unsubPartitions := make([]*metadata.Partition, 0, len(subInfos))
+       if len(r.partitions) == 0 {
+               for _, sub := range subInfos {
+                       unsubPartitions = append(unsubPartitions, 
sub.GetPartition())
+               }
+       } else {
+               for _, sub := range subInfos {
+                       if _, ok := 
r.partitions[sub.GetPartition().GetPartitionKey()]; !ok {
+                               unsubPartitions = append(unsubPartitions, 
sub.GetPartition())
+                       }
+               }
+       }
+       return unsubPartitions
+}
+
+// AddNewPartition append a new partition.
+func (r *RmtDataCache) AddNewPartition(newPartition *metadata.Partition) {
+       sub := &metadata.SubscribeInfo{}
+       sub.SetPartition(newPartition)
+       sub.SetConsumerID(r.consumerID)
+       sub.SetGroup(r.groupName)
+
+       r.metaMu.Lock()
+       defer r.metaMu.Unlock()
+       partitionKey := newPartition.GetPartitionKey()
+       if partition, ok := r.partitions[partitionKey]; !ok {
+               r.partitions[partitionKey] = partition
+               if partitions, ok := 
r.topicPartitions[partition.GetPartitionKey()]; !ok {
+                       newPartitions := make(map[string]bool)
+                       newPartitions[partitionKey] = true
+                       r.topicPartitions[partition.GetTopic()] = newPartitions
+               } else if _, ok := partitions[partitionKey]; !ok {
+                       partitions[partitionKey] = true
+               }
+               if partitions, ok := r.brokerPartitions[partition.GetBroker()]; 
!ok {
+                       newPartitions := make(map[string]bool)
+                       newPartitions[partitionKey] = true
+                       r.brokerPartitions[partition.GetBroker()] = 
newPartitions
+               } else if _, ok := partitions[partitionKey]; !ok {
+                       partitions[partitionKey] = true
+               }
+               r.partitionSubInfo[partitionKey] = sub
+       }
+       r.resetIdlePartition(partitionKey, true)
+}
+
+// HandleExpiredPartitions handles the expired partitions.
+func (r *RmtDataCache) HandleExpiredPartitions(wait time.Duration) {
+       r.metaMu.Lock()
+       defer r.metaMu.Unlock()
+       expired := make(map[string]bool, len(r.usedPartitions))
+       if len(r.usedPartitions) > 0 {
+               curr := time.Now().UnixNano() / int64(time.Millisecond)
+               for partition, time := range r.usedPartitions {
+                       if curr-time > wait.Milliseconds() {
+                               expired[partition] = true
+                               if p, ok := r.partitions[partition]; ok {
+                                       p.SetLastConsumed(false)
+                               }
+                       }
+               }
+               if len(expired) > 0 {
+                       for partition := range expired {
+                               r.resetIdlePartition(partition, true)
+                       }
+               }
+       }
+}
+
+// RemovePartition removes the given partition keys.
+func (r *RmtDataCache) RemovePartition(partitionKeys []string) {
+       r.metaMu.Lock()
+       defer r.metaMu.Unlock()
+
+       for _, partitionKey := range partitionKeys {
+               r.resetIdlePartition(partitionKey, false)
+               r.removeMetaInfo(partitionKey)
+       }
+}
+
+// IsFirstRegister returns whether the given partition is first registered.
+func (r *RmtDataCache) IsFirstRegister(partitionKey string) bool {
+       r.dataBookMu.Lock()
+       defer r.dataBookMu.Unlock()
+
+       if _, ok := r.partitionRegBooked[partitionKey]; !ok {
+               r.partitionRegBooked[partitionKey] = true
+       }
+       return r.partitionRegBooked[partitionKey]
+}
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/broker.go 
b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
index 7f9ead2..5105fd5 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/broker.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
@@ -22,11 +22,13 @@ import (
 
        "github.com/golang/protobuf/proto"
 
-       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
 )
 
 const (
@@ -47,14 +49,14 @@ const (
 )
 
 // RegisterRequestC2B implements the RegisterRequestC2B interface according to 
TubeMQ RPC protocol.
-func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata 
*metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) 
{
+func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata 
*metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) 
(*protocol.RegisterResponseB2C, error) {
        reqC2B := &protocol.RegisterRequestC2B{
                OpType:        proto.Int32(register),
                ClientId:      proto.String(sub.GetClientID()),
                GroupName:     
proto.String(metadata.GetSubscribeInfo().GetGroup()),
                TopicName:     
proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
                PartitionId:   
proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()),
-               QryPriorityId: 
proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+               QryPriorityId: proto.Int32(r.GetQryPriorityID()),
                ReadStatus:    proto.Int32(metadata.GetReadStatus()),
                AuthInfo:      sub.GetAuthorizedInfo(),
        }
@@ -66,7 +68,7 @@ func (c *rpcClient) RegisterRequestC2B(ctx context.Context, 
metadata *metadata.M
                }
        }
        offset := 
sub.GetAssignedPartOffset(metadata.GetSubscribeInfo().GetPartition().GetPartitionKey())
-       if offset != client.InValidOffset {
+       if offset != util.InvalidValue {
                reqC2B.CurrOffset = proto.Int64(offset)
        }
        data, err := proto.Marshal(reqC2B)
@@ -87,7 +89,7 @@ func (c *rpcClient) RegisterRequestC2B(ctx context.Context, 
metadata *metadata.M
                Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
        }
 
-       rspBody, err := c.doRequest(ctx, req)
+       rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
        if err != nil {
                return nil, err
        }
@@ -101,7 +103,7 @@ func (c *rpcClient) RegisterRequestC2B(ctx context.Context, 
metadata *metadata.M
 }
 
 // UnregisterRequestC2B implements the UnregisterRequestC2B interface 
according to TubeMQ RPC protocol.
-func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata 
metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) {
+func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata 
*metadata.Metadata, sub *sub.SubInfo) (*protocol.RegisterResponseB2C, error) {
        reqC2B := &protocol.RegisterRequestC2B{
                OpType:      proto.Int32(unregister),
                ClientId:    proto.String(sub.GetClientID()),
@@ -129,7 +131,7 @@ func (c *rpcClient) UnregisterRequestC2B(ctx 
context.Context, metadata metadata.
                Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
        }
 
-       rspBody, err := c.doRequest(ctx, req)
+       rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
        if err != nil {
                return nil, err
        }
@@ -143,7 +145,7 @@ func (c *rpcClient) UnregisterRequestC2B(ctx 
context.Context, metadata metadata.
 }
 
 // GetMessageRequestC2B implements the GetMessageRequestC2B interface 
according to TubeMQ RPC protocol.
-func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata 
*metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) 
(*protocol.GetMessageResponseB2C, error) {
+func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata 
*metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) 
(*protocol.GetMessageResponseB2C, error) {
        reqC2B := &protocol.GetMessageRequestC2B{
                ClientId:           proto.String(sub.GetClientID()),
                PartitionId:        
proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()),
@@ -171,7 +173,7 @@ func (c *rpcClient) GetMessageRequestC2B(ctx 
context.Context, metadata *metadata
                Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
        }
 
-       rspBody, err := c.doRequest(ctx, req)
+       rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
        if err != nil {
                return nil, err
        }
@@ -185,7 +187,7 @@ func (c *rpcClient) GetMessageRequestC2B(ctx 
context.Context, metadata *metadata
 }
 
 // CommitOffsetRequestC2B implements the CommitOffsetRequestC2B interface 
according to TubeMQ RPC protocol.
-func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata 
*metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, 
error) {
+func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata 
*metadata.Metadata, sub *sub.SubInfo) (*protocol.CommitOffsetResponseB2C, 
error) {
        reqC2B := &protocol.CommitOffsetRequestC2B{
                ClientId:         proto.String(sub.GetClientID()),
                TopicName:        
proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
@@ -206,12 +208,12 @@ func (c *rpcClient) CommitOffsetRequestC2B(ctx 
context.Context, metadata *metada
                ProtocolVer: proto.Int32(2),
        }
        req.RequestBody = &protocol.RequestBody{
-               Method:  proto.Int32(brokerConsumerHeartbeat),
+               Method:  proto.Int32(brokerConsumerCommit),
                Request: data,
                Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
        }
 
-       rspBody, err := c.doRequest(ctx, req)
+       rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
        if err != nil {
                return nil, err
        }
@@ -225,12 +227,12 @@ func (c *rpcClient) CommitOffsetRequestC2B(ctx 
context.Context, metadata *metada
 }
 
 // HeartbeatRequestC2B implements the HeartbeatRequestC2B interface according 
to TubeMQ RPC protocol.
-func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata 
*metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) 
(*protocol.HeartBeatResponseB2C, error) {
+func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata 
*metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) 
(*protocol.HeartBeatResponseB2C, error) {
        reqC2B := &protocol.HeartBeatRequestC2B{
                ClientId:      proto.String(sub.GetClientID()),
                GroupName:     
proto.String(metadata.GetSubscribeInfo().GetGroup()),
                ReadStatus:    proto.Int32(metadata.GetReadStatus()),
-               QryPriorityId: 
proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+               QryPriorityId: proto.Int32(r.GetQryPriorityID()),
                AuthInfo:      sub.GetAuthorizedInfo(),
        }
        partitions := r.GetPartitionByBroker(metadata.GetNode())
@@ -256,7 +258,7 @@ func (c *rpcClient) HeartbeatRequestC2B(ctx 
context.Context, metadata *metadata.
                Flag: proto.Int32(0),
        }
 
-       rspBody, err := c.doRequest(ctx, req)
+       rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
        if err != nil {
                return nil, err
        }
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go 
b/tubemq-client-twins/tubemq-client-go/rpc/client.go
index 71123c7..fab5bae 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/client.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -21,13 +21,14 @@ package rpc
 import (
        "context"
 
-       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
 )
 
@@ -39,21 +40,21 @@ const (
 // RPCClient is the rpc level client to interact with TubeMQ.
 type RPCClient interface {
        // RegisterRequestC2B is the rpc request for a consumer to register to 
a broker.
-       RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, 
sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
+       RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, 
sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.RegisterResponseB2C, error)
        // UnregisterRequestC2B is the rpc request for a consumer to unregister 
to a broker.
-       UnregisterRequestC2B(ctx context.Context, metadata metadata.Metadata, 
sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
+       UnregisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, 
sub *sub.SubInfo) (*protocol.RegisterResponseB2C, error)
        // GetMessageRequestC2B is the rpc request for a consumer to get 
message from a broker.
-       GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, 
sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, 
error)
+       GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, 
sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.GetMessageResponseB2C, 
error)
        // CommitOffsetRequestC2B is the rpc request for a consumer to commit 
offset to a broker.
-       CommitOffsetRequestC2B(ctx context.Context, metadata 
*metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, 
error)
+       CommitOffsetRequestC2B(ctx context.Context, metadata 
*metadata.Metadata, sub *sub.SubInfo) (*protocol.CommitOffsetResponseB2C, error)
        // HeartbeatRequestC2B is the rpc request for a consumer to send 
heartbeat to a broker.
-       HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, 
sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, 
error)
+       HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, 
sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartBeatResponseB2C, 
error)
        // RegisterRequestC2M is the rpc request for a consumer to register 
request to master.
-       RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, 
sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, 
error)
+       RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, 
sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.RegisterResponseM2C, error)
        // HeartRequestC2M is the rpc request for a consumer to send heartbeat 
to master.
-       HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub 
*client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error)
+       HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub 
*sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartResponseM2C, error)
        // CloseRequestC2M is the rpc request for a consumer to be closed to 
master.
-       CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub 
*client.SubInfo) (*protocol.CloseResponseM2C, error)
+       CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub 
*sub.SubInfo) (*protocol.CloseResponseM2C, error)
 }
 
 // New returns a default TubeMQ rpc Client
@@ -70,8 +71,8 @@ type rpcClient struct {
        config *config.Config
 }
 
-func (c *rpcClient) doRequest(ctx context.Context, req codec.RPCRequest) 
(*protocol.RspResponseBody, error) {
-       rsp, err := c.client.DoRequest(ctx, req)
+func (c *rpcClient) doRequest(ctx context.Context, address string, req 
codec.RPCRequest) (*protocol.RspResponseBody, error) {
+       rsp, err := c.client.DoRequest(ctx, address, req)
        if err != nil {
                return nil, errs.New(errs.RetRequestFailure, err.Error())
        }
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/master.go 
b/tubemq-client-twins/tubemq-client-go/rpc/master.go
index 9eb336a..c0d4bc3 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/master.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/master.go
@@ -22,11 +22,12 @@ import (
 
        "github.com/golang/protobuf/proto"
 
-       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
 )
 
 const (
@@ -39,7 +40,7 @@ const (
 )
 
 // RegisterRequestRequestC2M implements the RegisterRequestRequestC2M 
interface according to TubeMQ RPC protocol.
-func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata 
*metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) 
(*protocol.RegisterResponseM2C, error) {
+func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata 
*metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) 
(*protocol.RegisterResponseM2C, error) {
        reqC2M := &protocol.RegisterRequestC2M{
                ClientId:         proto.String(sub.GetClientID()),
                HostName:         proto.String(metadata.GetNode().GetHost()),
@@ -48,7 +49,7 @@ func (c *rpcClient) RegisterRequestC2M(ctx context.Context, 
metadata *metadata.M
                SessionTime:      proto.Int64(sub.GetSubscribedTime()),
                DefFlowCheckId:   proto.Int64(r.GetDefFlowCtrlID()),
                GroupFlowCheckId: proto.Int64(r.GetGroupFlowCtrlID()),
-               QryPriorityId:    
proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+               QryPriorityId:    proto.Int32(r.GetQryPriorityID()),
                AuthInfo:         sub.GetMasterCertificateIInfo(),
        }
        reqC2M.TopicList = make([]string, 0, len(sub.GetTopics()))
@@ -92,7 +93,7 @@ func (c *rpcClient) RegisterRequestC2M(ctx context.Context, 
metadata *metadata.M
                Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
        }
 
-       rspBody, err := c.doRequest(ctx, req)
+       rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
        if err != nil {
                return nil, err
        }
@@ -106,14 +107,14 @@ func (c *rpcClient) RegisterRequestC2M(ctx 
context.Context, metadata *metadata.M
 }
 
 // HeartRequestC2M implements the HeartRequestC2M interface according to 
TubeMQ RPC protocol.
-func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata 
*metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) 
(*protocol.HeartResponseM2C, error) {
+func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata 
*metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) 
(*protocol.HeartResponseM2C, error) {
        reqC2M := &protocol.HeartRequestC2M{
                ClientId:            proto.String(sub.GetClientID()),
                GroupName:           
proto.String(metadata.GetSubscribeInfo().GetGroup()),
                DefFlowCheckId:      proto.Int64(r.GetDefFlowCtrlID()),
                GroupFlowCheckId:    proto.Int64(r.GetGroupFlowCtrlID()),
                ReportSubscribeInfo: proto.Bool(false),
-               QryPriorityId:       
proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+               QryPriorityId:       proto.Int32(r.GetQryPriorityID()),
        }
        event := r.PollEventResult()
        if event != nil || metadata.GetReportTimes() {
@@ -159,7 +160,7 @@ func (c *rpcClient) HeartRequestC2M(ctx context.Context, 
metadata *metadata.Meta
                Flag: proto.Int32(0),
        }
 
-       rspBody, err := c.doRequest(ctx, req)
+       rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
        if err != nil {
                return nil, err
        }
@@ -173,7 +174,7 @@ func (c *rpcClient) HeartRequestC2M(ctx context.Context, 
metadata *metadata.Meta
 }
 
 // CloseRequestC2M implements the CloseRequestC2M interface according to 
TubeMQ RPC protocol.
-func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata 
*metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error) {
+func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata 
*metadata.Metadata, sub *sub.SubInfo) (*protocol.CloseResponseM2C, error) {
        reqC2M := &protocol.CloseRequestC2M{
                ClientId:  proto.String(sub.GetClientID()),
                GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()),
@@ -197,7 +198,7 @@ func (c *rpcClient) CloseRequestC2M(ctx context.Context, 
metadata *metadata.Meta
                Flag: proto.Int32(0),
        }
 
-       rspBody, err := c.doRequest(ctx, req)
+       rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
        if err != nil {
                return nil, err
        }
diff --git a/tubemq-client-twins/tubemq-client-go/client/info.go 
b/tubemq-client-twins/tubemq-client-go/sub/info.go
similarity index 62%
rename from tubemq-client-twins/tubemq-client-go/client/info.go
rename to tubemq-client-twins/tubemq-client-go/sub/info.go
index 25a2686..737b08a 100644
--- a/tubemq-client-twins/tubemq-client-go/client/info.go
+++ b/tubemq-client-twins/tubemq-client-go/sub/info.go
@@ -15,16 +15,19 @@
  * limitations under the License.
  */
 
-// Package client defines the api and information
-// which can be exposed to user.
-package client
+// package sub defines the subscription information of a client.
+package sub
 
 import (
+       "strconv"
+       "time"
+
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
 )
 
 // InvalidOffset represents the offset which is invalid.
-const InValidOffset = -1
+const InValidOffset = -2
 
 // SubInfo represents the sub information of the client.
 type SubInfo struct {
@@ -46,6 +49,46 @@ type SubInfo struct {
        masterCertificateInfo *protocol.MasterCertificateInfo
 }
 
+// NewSubInfo parses the subscription from the config to SubInfo.
+func NewSubInfo(config *config.Config) *SubInfo {
+       s := &SubInfo{
+               boundConsume:    config.Consumer.BoundConsume,
+               subscribedTime:  time.Now().UnixNano() / 
int64(time.Millisecond),
+               firstRegistered: false,
+               topics:          config.Consumer.Topics,
+               topicFilters:    config.Consumer.TopicFilters,
+       }
+       s.topicConds = make([]string, 0, len(config.Consumer.TopicFilters))
+       for topic, filters := range config.Consumer.TopicFilters {
+               cond := topic + "#"
+               count := 0
+               for _, filter := range filters {
+                       if count > 0 {
+                               cond += ","
+                       }
+                       cond += filter
+               }
+               s.topicConds = append(s.topicConds, cond)
+       }
+       if config.Consumer.BoundConsume {
+               s.sessionKey = config.Consumer.SessionKey
+               s.sourceCount = int32(config.Consumer.SourceCount)
+               s.selectBig = config.Consumer.SelectBig
+               assignedPartitions := config.Consumer.PartitionOffset
+               count := 0
+               for partition, offset := range assignedPartitions {
+                       if count > 0 {
+                               s.boundPartitions += ","
+                       }
+                       s.boundPartitions += partition
+                       s.boundPartitions += "="
+                       s.boundPartitions += strconv.Itoa(int(offset))
+                       count++
+               }
+       }
+       return s
+}
+
 // GetClientID returns the client ID.
 func (s *SubInfo) GetClientID() string {
        return s.clientID
@@ -124,6 +167,32 @@ func (s *SubInfo) GetAuthorizedInfo() 
*protocol.AuthorizedInfo {
        return s.authInfo
 }
 
+// GetMasterCertifateInfo returns the masterCertificateInfo.
 func (s *SubInfo) GetMasterCertificateIInfo() *protocol.MasterCertificateInfo {
        return s.masterCertificateInfo
 }
+
+// FirstRegistered sets the firstRegistered to true.
+func (s *SubInfo) FirstRegistered() {
+       s.firstRegistered = true
+}
+
+// SetAuthorizedInfo sets the authorizedInfo.
+func (s *SubInfo) SetAuthorizedInfo(auth *protocol.AuthorizedInfo) {
+       s.authInfo = auth
+}
+
+// SetMasterCertificateInfo sets the masterCertificateInfo.
+func (s *SubInfo) SetMasterCertificateInfo(info 
*protocol.MasterCertificateInfo) {
+       s.masterCertificateInfo = info
+}
+
+// SetIsNotAllocated sets the notAllocated.
+func (s *SubInfo) SetIsNotAllocated(isNotAllocated bool) {
+       s.notAllocated = isNotAllocated
+}
+
+// SetClientID sets the clientID.
+func (s *SubInfo) SetClientID(clientID string) {
+       s.clientID = clientID
+}
diff --git a/tubemq-client-twins/tubemq-client-go/transport/client.go 
b/tubemq-client-twins/tubemq-client-go/transport/client.go
index d816a7f..91c9692 100644
--- a/tubemq-client-twins/tubemq-client-go/transport/client.go
+++ b/tubemq-client-twins/tubemq-client-go/transport/client.go
@@ -28,8 +28,6 @@ import (
 
 // Options represents the transport options
 type Options struct {
-       Address string
-
        CACertFile    string
        TLSCertFile   string
        TLSKeyFile    string
@@ -51,9 +49,9 @@ func New(opts *Options, pool *multiplexing.Pool) *Client {
 }
 
 // DoRequest sends the request and return the decoded response
-func (c *Client) DoRequest(ctx context.Context, req codec.RPCRequest) 
(codec.RPCResponse, error) {
+func (c *Client) DoRequest(ctx context.Context, address string, req 
codec.RPCRequest) (codec.RPCResponse, error) {
        opts := &multiplexing.DialOptions{
-               Address: c.opts.Address,
+               Address: address,
                Network: "tcp",
        }
        if c.opts.CACertFile != "" {
@@ -63,7 +61,7 @@ func (c *Client) DoRequest(ctx context.Context, req 
codec.RPCRequest) (codec.RPC
                opts.TLSServerName = c.opts.TLSServerName
        }
 
-       conn, err := c.pool.Get(ctx, c.opts.Address, req.GetSerialNo(), opts)
+       conn, err := c.pool.Get(ctx, address, req.GetSerialNo(), opts)
        if err != nil {
                return nil, err
        }
diff --git a/tubemq-client-twins/tubemq-client-go/util/util.go 
b/tubemq-client-twins/tubemq-client-go/util/util.go
new file mode 100644
index 0000000..77422ac
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/util/util.go
@@ -0,0 +1,52 @@
+package util
+
+import (
+       "net"
+)
+
+var InvalidValue = int64(-2)
+
+func GetLocalHost() string {
+       ifaces, err := net.Interfaces()
+       if err != nil {
+               return ""
+       }
+       for _, iface := range ifaces {
+               if iface.Flags&net.FlagUp == 0 {
+                       continue // interface down
+               }
+               if iface.Flags&net.FlagLoopback != 0 {
+                       continue // loopback interface
+               }
+               addrs, err := iface.Addrs()
+               if err != nil {
+                       return ""
+               }
+               for _, addr := range addrs {
+                       var ip net.IP
+                       switch v := addr.(type) {
+                       case *net.IPNet:
+                               ip = v.IP
+                       case *net.IPAddr:
+                               ip = v.IP
+                       }
+                       if ip == nil || ip.IsLoopback() {
+                               continue
+                       }
+                       ip = ip.To4()
+                       if ip == nil {
+                               continue // not an ipv4 address
+                       }
+                       return ip.String()
+               }
+       }
+       return ""
+}
+
+func GenBrokerAuthenticateToken(username string, password string) string {
+       return ""
+}
+
+func GenMasterAuthenticateToken(username string, password string) string {
+       return ""
+}

Reply via email to