This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 877505e72540d9b3a98de2333a3afd3e933eb1d0
Author: Zijie Lu <[email protected]>
AuthorDate: Mon May 24 11:06:45 2021 +0800

    [INLONG-604]rpc request for Go SDK
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 .../tubemq-client-go/client/info.go                | 129 ++++++++++
 .../tubemq-client-go/client/remote.go              |  79 ++++++
 .../tubemq-client-go/codec/tubemq_codec.go         |   8 +-
 .../tubemq-client-go/config/config.go              |  12 +
 tubemq-client-twins/tubemq-client-go/errs/errs.go  |  50 ++++
 .../tubemq-client-go/metadata/consumer_event.go    |  56 +++++
 .../tubemq-client-go/metadata/metadata.go          |  47 ++++
 .../tubemq-client-go/metadata/node.go              |  55 +++++
 .../tubemq-client-go/metadata/partition.go         |  57 +++++
 .../tubemq-client-go/metadata/subcribe_info.go     |  55 +++++
 tubemq-client-twins/tubemq-client-go/rpc/broker.go | 273 +++++++++++++++++++++
 tubemq-client-twins/tubemq-client-go/rpc/client.go |  72 ++++++
 tubemq-client-twins/tubemq-client-go/rpc/master.go | 212 ++++++++++++++++
 13 files changed, 1104 insertions(+), 1 deletion(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/info.go 
b/tubemq-client-twins/tubemq-client-go/client/info.go
new file mode 100644
index 0000000..25a2686
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/client/info.go
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package client defines the api and information
+// which can be exposed to user.
+package client
+
+import (
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+)
+
+// InvalidOffset represents the offset which is invalid.
+const InValidOffset = -1
+
+// SubInfo represents the sub information of the client.
+type SubInfo struct {
+       clientID              string
+       boundConsume          bool
+       selectBig             bool
+       sourceCount           int32
+       sessionKey            string
+       notAllocated          bool
+       firstRegistered       bool
+       subscribedTime        int64
+       boundPartitions       string
+       topics                []string
+       topicConds            []string
+       topicFilter           map[string]bool
+       assignedPartitions    map[string]uint64
+       topicFilters          map[string][]string
+       authInfo              *protocol.AuthorizedInfo
+       masterCertificateInfo *protocol.MasterCertificateInfo
+}
+
+// GetClientID returns the client ID.
+func (s *SubInfo) GetClientID() string {
+       return s.clientID
+}
+
+// IsFiltered returns whether a topic if filtered.
+func (s *SubInfo) IsFiltered(topic string) bool {
+       if filtered, ok := s.topicFilter[topic]; ok {
+               return filtered
+       }
+       return false
+}
+
+// GetTopicFilters returns the topic filters.
+func (s *SubInfo) GetTopicFilters() map[string][]string {
+       return s.topicFilters
+}
+
+// GetAssignedPartOffset returns the assignedPartOffset of the given 
partitionKey.
+func (s *SubInfo) GetAssignedPartOffset(partitionKey string) int64 {
+       if !s.firstRegistered && s.boundConsume && s.notAllocated {
+               if offset, ok := s.assignedPartitions[partitionKey]; ok {
+                       return int64(offset)
+               }
+       }
+       return InValidOffset
+}
+
+// BoundConsume returns whether it is bondConsume.
+func (s *SubInfo) BoundConsume() bool {
+       return s.boundConsume
+}
+
+// GetSubscribedTime returns the subscribedTime.
+func (s *SubInfo) GetSubscribedTime() int64 {
+       return s.subscribedTime
+}
+
+// GetTopics returns the topics.
+func (s *SubInfo) GetTopics() []string {
+       return s.topics
+}
+
+// GetTopicConds returns the topicConds.
+func (s *SubInfo) GetTopicConds() []string {
+       return s.topicConds
+}
+
+// GetSessionKey returns the sessionKey.
+func (s *SubInfo) GetSessionKey() string {
+       return s.sessionKey
+}
+
+// SelectBig returns whether it is selectBig.
+func (s *SubInfo) SelectBig() bool {
+       return s.selectBig
+}
+
+// GetSourceCount returns the sourceCount.
+func (s *SubInfo) GetSourceCount() int32 {
+       return s.sourceCount
+}
+
+// GetBoundPartInfo returns the boundPartitions.
+func (s *SubInfo) GetBoundPartInfo() string {
+       return s.boundPartitions
+}
+
+// IsNotAllocated returns whether it is not allocated.
+func (s *SubInfo) IsNotAllocated() bool {
+       return s.notAllocated
+}
+
+// GetAuthorizedInfo returns the authInfo.
+func (s *SubInfo) GetAuthorizedInfo() *protocol.AuthorizedInfo {
+       return s.authInfo
+}
+
+func (s *SubInfo) GetMasterCertificateIInfo() *protocol.MasterCertificateInfo {
+       return s.masterCertificateInfo
+}
diff --git a/tubemq-client-twins/tubemq-client-go/client/remote.go 
b/tubemq-client-twins/tubemq-client-go/client/remote.go
new file mode 100644
index 0000000..fd60425
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/client/remote.go
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package client defines the api and information
+// which can be exposed to user.
+package client
+
+import (
+       "sync"
+
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+)
+
+// RmtDataCache represents the data returned from TubeMQ.
+type RmtDataCache struct {
+       consumerID       string
+       groupName        string
+       underGroupCtrl   bool
+       defFlowCtrlID    int64
+       groupFlowCtrlID  int64
+       subscribeInfo    []*metadata.SubscribeInfo
+       rebalanceResults []*metadata.ConsumerEvent
+       mu               sync.Mutex
+       brokerToPartitions map[*metadata.Node][]*metadata.Partition
+}
+
+// GetUnderGroupCtrl returns the underGroupCtrl.
+func (r *RmtDataCache) GetUnderGroupCtrl() bool {
+       return r.underGroupCtrl
+}
+
+// GetDefFlowCtrlID returns the defFlowCtrlID.
+func (r *RmtDataCache) GetDefFlowCtrlID() int64 {
+       return r.defFlowCtrlID
+}
+
+// GetGroupFlowCtrlID returns the groupFlowCtrlID.
+func (r *RmtDataCache) GetGroupFlowCtrlID() int64 {
+       return r.groupFlowCtrlID
+}
+
+// GetSubscribeInfo returns the subscribeInfo.
+func (r *RmtDataCache) GetSubscribeInfo() []*metadata.SubscribeInfo {
+       return r.subscribeInfo
+}
+
+// PollEventResult polls the first event result from the rebalanceResults.
+func (r *RmtDataCache) PollEventResult() *metadata.ConsumerEvent {
+       r.mu.Lock()
+       defer r.mu.Unlock()
+       if len(r.rebalanceResults) > 0 {
+               event := r.rebalanceResults[0]
+               r.rebalanceResults = r.rebalanceResults[1:]
+               return event
+       }
+       return nil
+}
+
+// GetPartitionByBroker returns the
+func (r *RmtDataCache) GetPartitionByBroker(node *metadata.Node) 
[]*metadata.Partition {
+       if partitions, ok := r.brokerToPartitions[node]; ok {
+               return partitions
+       }
+       return nil
+}
diff --git a/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go 
b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
index 4a8898f..f805e08 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
@@ -49,6 +49,7 @@ const (
 )
 
 var serialNo uint32
+
 func NewSerialNo() uint32 {
        return atomic.AddUint32(&serialNo, 1)
 }
@@ -151,6 +152,12 @@ type TubeMQRPCRequest struct {
        Body          proto.Message
 }
 
+func NewRPCRequest() *TubeMQRPCRequest {
+       return &TubeMQRPCRequest{
+               serialNo: NewSerialNo(),
+       }
+}
+
 // GetSerialNo returns the serialNo.
 func (t *TubeMQRPCRequest) GetSerialNo() uint32 {
        return t.serialNo
@@ -295,4 +302,3 @@ func readDelimitedFrom(data []byte, msg proto.Message) 
([]byte, error) {
 
        return data[int(size)+n:], nil
 }
-
diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go 
b/tubemq-client-twins/tubemq-client-go/config/config.go
new file mode 100644
index 0000000..999d87a
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/config/config.go
@@ -0,0 +1,12 @@
+package config
+
+import (
+       "time"
+)
+
+type Config struct {
+       Net struct {
+               // How long to wait for a response.
+               ReadTimeout time.Duration
+       }
+}
diff --git a/tubemq-client-twins/tubemq-client-go/errs/errs.go 
b/tubemq-client-twins/tubemq-client-go/errs/errs.go
new file mode 100644
index 0000000..637e381
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/errs/errs.go
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// package errs defines the TubeMQ error codes and TubeMQ error msg.
+package errs
+
+import (
+       "fmt"
+)
+
+const (
+       RetMarshalFailure    = 1
+       RetResponseException = 2
+       RetUnMarshalFailure  = 3
+       RetAssertionFailure  = 4
+)
+
+var ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure")
+
+// Error provides a TubeMQ-specific error container
+type Error struct {
+       Code int32
+       Msg  string
+}
+
+func (e *Error) Error() string {
+       return fmt.Sprintf("code: %d, msg:%s", e.Code, e.Msg)
+}
+
+func New(code int32, msg string) error {
+       err := &Error{
+               Code: code,
+               Msg:  msg,
+       }
+       return err
+}
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go 
b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
new file mode 100644
index 0000000..2b4ce49
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package metadata
+
+// ConsumerEvent represents the metadata of a consumer event
+type ConsumerEvent struct {
+       rebalanceID   int64
+       eventType     int32
+       eventStatus   int32
+       subscribeInfo []*SubscribeInfo
+}
+
+// GetRebalanceID returns the rebalanceID of a consumer event.
+func (c *ConsumerEvent) GetRebalanceID() int64 {
+       return c.rebalanceID
+}
+
+// GetEventType returns the event type of a consumer event.
+func (c *ConsumerEvent) GetEventType() int32 {
+       return c.eventType
+}
+
+// GetEventStatus returns the event status of a consumer event.
+func (c *ConsumerEvent) GetEventStatus() int32 {
+       return c.eventStatus
+}
+
+// SetEventType sets the event type.
+func (c *ConsumerEvent) SetEventType(eventType int32) {
+       c.eventType = eventType
+}
+
+// SetEventStatus sets the event status.
+func (c *ConsumerEvent) SetEventStatus(eventStatus int32) {
+       c.eventStatus = eventStatus
+}
+
+// GetSubscribeInfo returns the list of SubscribeInfo of a consumer event.
+func (c *ConsumerEvent) GetSubscribeInfo() []*SubscribeInfo {
+       return c.subscribeInfo
+}
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/metadata.go 
b/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
new file mode 100644
index 0000000..861a06e
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// package metadata defines all the metadata of the TubeMQ broker and producer.
+package metadata
+
+// Metadata represents the metadata of
+type Metadata struct {
+       node          *Node
+       subscribeInfo *SubscribeInfo
+       readStatus    int32
+       reportTimes   bool
+}
+
+// GetNode returns the node of the metadata.
+func (m *Metadata) GetNode() *Node {
+       return m.node
+}
+
+// GetSubscribeInfo returns the SubscribeInfo of the metadata.
+func (m *Metadata) GetSubscribeInfo() *SubscribeInfo {
+       return m.subscribeInfo
+}
+
+// GetReadStatus returns the read status.
+func (m *Metadata) GetReadStatus() int32 {
+       return m.readStatus
+}
+
+// GetReportTimes returns the report times.
+func (m *Metadata) GetReportTimes() bool {
+       return m.reportTimes
+}
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/node.go 
b/tubemq-client-twins/tubemq-client-go/metadata/node.go
new file mode 100644
index 0000000..625f278
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/metadata/node.go
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package metadata
+
+import (
+       "strconv"
+)
+
+// Node represents the metadata of a node.
+type Node struct {
+       id      uint32
+       host    string
+       port    uint32
+       address string
+}
+
+// GetID returns the id of a node.
+func (n *Node) GetID() uint32 {
+       return n.id
+}
+
+// GetPort returns the port of a node.
+func (n *Node) GetPort() uint32 {
+       return n.port
+}
+
+// GetHost returns the hostname of a node.
+func (n *Node) GetHost() string {
+       return n.host
+}
+
+// GetAddress returns the address of a node.
+func (n *Node) GetAddress() string {
+       return n.address
+}
+
+// String returns the metadata of a node as a string.
+func (n *Node) String() string {
+       return strconv.Itoa(int(n.id)) + ":" + n.host + ":" + 
strconv.Itoa(int(n.port))
+}
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go 
b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
new file mode 100644
index 0000000..a180214
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package metadata
+
+import (
+       "strconv"
+)
+
+// Partition represents the metadata of a partition.
+type Partition struct {
+       topic        string
+       Broker       *Node
+       partitionID  int32
+       partitionKey string
+       offset       int64
+       lastConsumed bool
+}
+
+// GetLastConsumed returns lastConsumed of a partition.
+func (p *Partition) GetLastConsumed() bool {
+       return p.lastConsumed
+}
+
+// GetPartitionID returns the partition id of a partition.
+func (p *Partition) GetPartitionID() int32 {
+       return p.partitionID
+}
+
+// GetPartitionKey returns the partition key of a partition.
+func (p *Partition) GetPartitionKey() string {
+       return p.partitionKey
+}
+
+// GetTopic returns the topic of the partition subscribed to.
+func (p *Partition) GetTopic() string {
+       return p.topic
+}
+
+// String returns the metadata of a Partition as a string.
+func (p *Partition) String() string {
+       return p.Broker.String() + "#" + p.topic + "@" + 
strconv.Itoa(int(p.partitionID))
+}
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go 
b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
new file mode 100644
index 0000000..d76437a
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package metadata
+
+import (
+       "fmt"
+)
+
+// SubscribeInfo represents the metadata of the subscribe info.
+type SubscribeInfo struct {
+       group         string
+       consumerID    string
+       partition     *Partition
+       qryPriorityID int32
+}
+
+// GetGroup returns the group name.
+func (s *SubscribeInfo) GetGroup() string {
+       return s.group
+}
+
+// GetConsumerID returns the consumer id.
+func (s *SubscribeInfo) GetConsumerID() string {
+       return s.consumerID
+}
+
+// GetPartition returns the partition.
+func (s *SubscribeInfo) GetPartition() *Partition {
+       return s.partition
+}
+
+// GetQryPriorityID returns the QryPriorityID.
+func (s *SubscribeInfo) GetQryPriorityID() int32 {
+       return s.qryPriorityID
+}
+
+// String returns the contents of SubscribeInfo as a string.
+func (s *SubscribeInfo) String() string {
+       return fmt.Sprintf("%s@%s-%s", s.consumerID, s.group, 
s.partition.String())
+}
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/broker.go 
b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
new file mode 100644
index 0000000..aa7b831
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rpc
+
+import (
+       "github.com/golang/protobuf/proto"
+
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+)
+
+const (
+       Register   = 31
+       Unregister = 32
+)
+
+const (
+       BrokerProducerRegister = iota + 11
+       BrokerProducerHeartbeat
+       BrokerProducerSendMsg
+       BrokerProducerClose
+       BrokerConsumerRegister
+       BrokerConsumerHeartbeat
+       BrokerConsumerGetMsg
+       BrokerConsumerCommit
+       BrokerConsumerClose
+)
+
+// RegisterRequestC2B implements the RegisterRequestC2B interface according to 
TubeMQ RPC protocol.
+func (c *rpcClient) RegisterRequestC2B(metadata *metadata.Metadata, sub 
*client.SubInfo) (*protocol.RegisterResponseB2C, error) {
+       reqC2B := &protocol.RegisterRequestC2B{
+               OpType:        proto.Int32(Register),
+               ClientId:      proto.String(sub.GetClientID()),
+               GroupName:     
proto.String(metadata.GetSubscribeInfo().GetGroup()),
+               TopicName:     
proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
+               PartitionId:   
proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()),
+               QryPriorityId: 
proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+               ReadStatus:    proto.Int32(metadata.GetReadStatus()),
+               AuthInfo:      sub.GetAuthorizedInfo(),
+       }
+       if 
sub.IsFiltered(metadata.GetSubscribeInfo().GetPartition().GetTopic()) {
+               tfs := sub.GetTopicFilters()
+               reqC2B.FilterCondStr = make([]string, 0, 
len(tfs[metadata.GetSubscribeInfo().GetPartition().GetTopic()]))
+               for _, tf := range 
tfs[metadata.GetSubscribeInfo().GetPartition().GetTopic()] {
+                       reqC2B.FilterCondStr = append(reqC2B.FilterCondStr, tf)
+               }
+       }
+       offset := 
sub.GetAssignedPartOffset(metadata.GetSubscribeInfo().GetPartition().GetPartitionKey())
+       if offset != client.InValidOffset {
+               reqC2B.CurrOffset = proto.Int64(offset)
+       }
+       data, err := proto.Marshal(reqC2B)
+       if err != nil {
+               return nil, errs.New(errs.RetMarshalFailure, err.Error())
+       }
+       req := codec.NewRPCRequest()
+       req.RpcHeader = &protocol.RpcConnHeader{
+               Flag: proto.Int32(0),
+       }
+       req.RequestHeader = &protocol.RequestHeader{
+               ServiceType: proto.Int32(ReadService),
+               ProtocolVer: proto.Int32(2),
+       }
+       req.RequestBody = &protocol.RequestBody{
+               Method:  proto.Int32(BrokerConsumerRegister),
+               Request: data,
+               Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
+       }
+       rsp, err := c.client.DoRequest(c.ctx, req)
+       if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
+               if v.ResponseException != nil {
+                       return nil, errs.New(errs.RetResponseException, 
v.ResponseException.String())
+               }
+               rspC2B := &protocol.RegisterResponseB2C{}
+               err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
+               if err != nil {
+                       return nil, errs.New(errs.RetUnMarshalFailure, 
err.Error())
+               }
+               return rspC2B, nil
+       }
+       return nil, errs.ErrAssertionFailure
+}
+
+// UnregisterRequestC2B implements the UnregisterRequestC2B interface 
according to TubeMQ RPC protocol.
+func (c *rpcClient) UnregisterRequestC2B(metadata metadata.Metadata, sub 
*client.SubInfo) (*protocol.RegisterResponseB2C, error) {
+       reqC2B := &protocol.RegisterRequestC2B{
+               OpType:      proto.Int32(Unregister),
+               ClientId:    proto.String(sub.GetClientID()),
+               GroupName:   
proto.String(metadata.GetSubscribeInfo().GetGroup()),
+               TopicName:   
proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
+               PartitionId: 
proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()),
+               ReadStatus:  proto.Int32(metadata.GetReadStatus()),
+               AuthInfo:    sub.GetAuthorizedInfo(),
+       }
+       req := codec.NewRPCRequest()
+       req.RpcHeader = &protocol.RpcConnHeader{
+               Flag: proto.Int32(0),
+       }
+       data, err := proto.Marshal(reqC2B)
+       if err != nil {
+               return nil, errs.New(errs.RetMarshalFailure, err.Error())
+       }
+       req.RequestHeader = &protocol.RequestHeader{
+               ServiceType: proto.Int32(ReadService),
+               ProtocolVer: proto.Int32(2),
+       }
+       req.RequestBody = &protocol.RequestBody{
+               Method:  proto.Int32(BrokerConsumerRegister),
+               Request: data,
+               Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
+       }
+       rsp, err := c.client.DoRequest(c.ctx, req)
+       if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
+               if v.ResponseException != nil {
+                       return nil, errs.New(errs.RetResponseException, 
v.ResponseException.String())
+               }
+               rspC2B := &protocol.RegisterResponseB2C{}
+               err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
+               if err != nil {
+                       return nil, errs.New(errs.RetUnMarshalFailure, 
err.Error())
+               }
+               return rspC2B, nil
+       }
+       return nil, errs.ErrAssertionFailure
+}
+
+// GetMessageRequestC2B implements the GetMessageRequestC2B interface 
according to TubeMQ RPC protocol.
+func (c *rpcClient) GetMessageRequestC2B(metadata *metadata.Metadata, sub 
*client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, 
error) {
+       reqC2B := &protocol.GetMessageRequestC2B{
+               ClientId:           proto.String(sub.GetClientID()),
+               PartitionId:        
proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()),
+               GroupName:          
proto.String(metadata.GetSubscribeInfo().GetGroup()),
+               TopicName:          
proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
+               EscFlowCtrl:        proto.Bool(r.GetUnderGroupCtrl()),
+               LastPackConsumed:   
proto.Bool(metadata.GetSubscribeInfo().GetPartition().GetLastConsumed()),
+               ManualCommitOffset: proto.Bool(false),
+       }
+       req := codec.NewRPCRequest()
+       req.RpcHeader = &protocol.RpcConnHeader{
+               Flag: proto.Int32(0),
+       }
+       data, err := proto.Marshal(reqC2B)
+       if err != nil {
+               return nil, errs.New(errs.RetMarshalFailure, err.Error())
+       }
+       req.RequestHeader = &protocol.RequestHeader{
+               ServiceType: proto.Int32(ReadService),
+               ProtocolVer: proto.Int32(2),
+       }
+       req.RequestBody = &protocol.RequestBody{
+               Method:  proto.Int32(BrokerConsumerGetMsg),
+               Request: data,
+               Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
+       }
+       rsp, err := c.client.DoRequest(c.ctx, req)
+       if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
+               if v.ResponseException != nil {
+                       return nil, errs.New(errs.RetResponseException, 
v.ResponseException.String())
+               }
+               rspC2B := &protocol.GetMessageResponseB2C{}
+               err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
+               if err != nil {
+                       return nil, errs.New(errs.RetUnMarshalFailure, 
err.Error())
+               }
+               return rspC2B, nil
+       }
+       return nil, errs.ErrAssertionFailure
+}
+
+// CommitOffsetRequestC2B implements the CommitOffsetRequestC2B interface 
according to TubeMQ RPC protocol.
+func (c *rpcClient) CommitOffsetRequestC2B(metadata *metadata.Metadata, sub 
*client.SubInfo) (*protocol.CommitOffsetResponseB2C, error) {
+       reqC2B := &protocol.CommitOffsetRequestC2B{
+               ClientId:         proto.String(sub.GetClientID()),
+               TopicName:        
proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
+               PartitionId:      
proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()),
+               GroupName:        
proto.String(metadata.GetSubscribeInfo().GetGroup()),
+               LastPackConsumed: 
proto.Bool(metadata.GetSubscribeInfo().GetPartition().GetLastConsumed()),
+       }
+       req := codec.NewRPCRequest()
+       req.RpcHeader = &protocol.RpcConnHeader{
+               Flag: proto.Int32(10),
+       }
+       data, err := proto.Marshal(reqC2B)
+       if err != nil {
+               return nil, errs.New(errs.RetMarshalFailure, err.Error())
+       }
+       req.RequestHeader = &protocol.RequestHeader{
+               ServiceType: proto.Int32(ReadService),
+               ProtocolVer: proto.Int32(2),
+       }
+       req.RequestBody = &protocol.RequestBody{
+               Method:  proto.Int32(BrokerConsumerHeartbeat),
+               Request: data,
+               Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
+       }
+       rsp, err := c.client.DoRequest(c.ctx, req)
+       if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
+               if v.ResponseException != nil {
+                       return nil, errs.New(errs.RetResponseException, 
v.ResponseException.String())
+               }
+               rspC2B := &protocol.CommitOffsetResponseB2C{}
+               err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
+               if err != nil {
+                       return nil, errs.New(errs.RetUnMarshalFailure, 
err.Error())
+               }
+               return rspC2B, nil
+       }
+       return nil, errs.ErrAssertionFailure
+}
+
+// HeartbeatRequestC2B implements the HeartbeatRequestC2B interface according 
to TubeMQ RPC protocol.
+func (c *rpcClient) HeartbeatRequestC2B(metadata *metadata.Metadata, sub 
*client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, 
error) {
+       reqC2B := &protocol.HeartBeatRequestC2B{
+               ClientId:      proto.String(sub.GetClientID()),
+               GroupName:     
proto.String(metadata.GetSubscribeInfo().GetGroup()),
+               ReadStatus:    proto.Int32(metadata.GetReadStatus()),
+               QryPriorityId: 
proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+               AuthInfo:      sub.GetAuthorizedInfo(),
+       }
+       partitions := r.GetPartitionByBroker(metadata.GetNode())
+       reqC2B.PartitionInfo = make([]string, 0, len(partitions))
+       for _, partition := range partitions {
+               reqC2B.PartitionInfo = append(reqC2B.PartitionInfo, 
partition.String())
+       }
+       data, err := proto.Marshal(reqC2B)
+       if err != nil {
+               return nil, errs.New(errs.RetMarshalFailure, err.Error())
+       }
+       req := codec.NewRPCRequest()
+       req.RequestHeader = &protocol.RequestHeader{
+               ServiceType: proto.Int32(ReadService),
+               ProtocolVer: proto.Int32(2),
+       }
+       req.RequestBody = &protocol.RequestBody{
+               Method:  proto.Int32(BrokerConsumerHeartbeat),
+               Request: data,
+               Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
+       }
+       req.RpcHeader = &protocol.RpcConnHeader{
+               Flag: proto.Int32(0),
+       }
+       rsp, err := c.client.DoRequest(c.ctx, req)
+       if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
+               if v.ResponseException != nil {
+                       return nil, errs.New(errs.RetResponseException, 
v.ResponseException.String())
+               }
+               rspC2B := &protocol.HeartBeatResponseB2C{}
+               err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
+               if err != nil {
+                       return nil, errs.New(errs.RetUnMarshalFailure, 
err.Error())
+               }
+               return rspC2B, nil
+       }
+       return nil, errs.ErrAssertionFailure
+}
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go 
b/tubemq-client-twins/tubemq-client-go/rpc/client.go
new file mode 100644
index 0000000..79a446e
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package rpc encapsulates all the rpc request to TubeMQ.
+package rpc
+
+import (
+       "context"
+
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+)
+
+const (
+       ReadService = 2
+       AdminService = 4
+)
+
+// RPCClient is the rpc level client to interact with TubeMQ.
+type RPCClient interface {
+       // RegisterRequestC2B is the rpc request for a consumer to register to 
a broker.
+       RegisterRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) 
(*protocol.RegisterResponseB2C, error)
+       // UnregisterRequestC2B is the rpc request for a consumer to unregister 
to a broker.
+       UnregisterRequestC2B(metadata metadata.Metadata, sub *client.SubInfo) 
(*protocol.RegisterResponseB2C, error)
+       // GetMessageRequestC2B is the rpc request for a consumer to get 
message from a broker.
+       GetMessageRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, 
r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error)
+       // CommitOffsetRequestC2B is the rpc request for a consumer to commit 
offset to a broker.
+       CommitOffsetRequestC2B(metadata *metadata.Metadata, sub 
*client.SubInfo) (*protocol.CommitOffsetResponseB2C, error)
+       // HeartbeatRequestC2B is the rpc request for a consumer to send 
heartbeat to a broker.
+       HeartbeatRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r 
*client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error)
+       // RegisterRequestC2M is the rpc request for a consumer to register 
request to master.
+       RegisterRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r 
*client.RmtDataCache) (*protocol.RegisterResponseM2C, error)
+       // HeartRequestC2M is the rpc request for a consumer to send heartbeat 
to master.
+       HeartRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r 
*client.RmtDataCache) (*protocol.HeartResponseM2C, error)
+       // CloseRequestC2M is the rpc request for a consumer to be closed to 
master.
+       CloseRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo) 
(*protocol.CloseResponseM2C, error)
+}
+
+type rpcClient struct {
+       pool   *multiplexing.Pool
+       client *transport.Client
+       ctx    context.Context
+       config *config.Config
+}
+
+// New returns a default TubeMQ rpc Client
+func New(pool *multiplexing.Pool, opts *transport.Options, ctx 
context.Context, config *config.Config) RPCClient {
+       return &rpcClient{
+               pool:   pool,
+               client: transport.New(opts, pool),
+               ctx:    ctx,
+               config: config,
+       }
+}
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/master.go 
b/tubemq-client-twins/tubemq-client-go/rpc/master.go
new file mode 100644
index 0000000..4f9e7f9
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/rpc/master.go
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rpc
+
+import (
+       "github.com/golang/protobuf/proto"
+
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+)
+
+const (
+       MasterProducerRegister = iota + 1
+       MasterProducerHeartbeat
+       MasterProducerClose
+       MasterConsumerRegister
+       MasterConsumerHeartbeat
+       MasterConsumerClose
+)
+
+// RegisterRequestRequestC2M implements the RegisterRequestRequestC2M 
interface according to TubeMQ RPC protocol.
+func (c *rpcClient) RegisterRequestC2M(metadata *metadata.Metadata, sub 
*client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error) 
{
+       reqC2M := &protocol.RegisterRequestC2M{
+               ClientId:         proto.String(sub.GetClientID()),
+               HostName:         proto.String(metadata.GetNode().GetHost()),
+               GroupName:        
proto.String(metadata.GetSubscribeInfo().GetGroup()),
+               RequireBound:     proto.Bool(sub.BoundConsume()),
+               SessionTime:      proto.Int64(sub.GetSubscribedTime()),
+               DefFlowCheckId:   proto.Int64(r.GetDefFlowCtrlID()),
+               GroupFlowCheckId: proto.Int64(r.GetGroupFlowCtrlID()),
+               QryPriorityId:    
proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+               AuthInfo:         sub.GetMasterCertificateIInfo(),
+       }
+       reqC2M.TopicList = make([]string, 0, len(sub.GetTopics()))
+       reqC2M.TopicList = append(reqC2M.TopicList, sub.GetTopics()...)
+
+       reqC2M.SubscribeInfo = make([]string, 0, len(r.GetSubscribeInfo()))
+       for _, s := range r.GetSubscribeInfo() {
+               reqC2M.SubscribeInfo = append(reqC2M.SubscribeInfo, s.String())
+       }
+
+       reqC2M.TopicCondition = make([]string, 0, len(sub.GetTopicConds()))
+       reqC2M.TopicCondition = append(reqC2M.TopicCondition, 
sub.GetTopicConds()...)
+
+       if sub.BoundConsume() {
+               reqC2M.SessionKey = proto.String(sub.GetSessionKey())
+               reqC2M.SelectBig = proto.Bool(sub.SelectBig())
+               reqC2M.TotalCount = proto.Int32(sub.GetSourceCount())
+               reqC2M.RequiredPartition = proto.String(sub.GetBoundPartInfo())
+               reqC2M.NotAllocated = proto.Bool(sub.IsNotAllocated())
+       }
+
+       reqC2M.AuthInfo = &protocol.MasterCertificateInfo{
+               AuthInfo: &protocol.AuthenticateInfo{},
+       }
+
+       data, err := proto.Marshal(reqC2M)
+       if err != nil {
+               return nil, errs.New(errs.RetMarshalFailure, err.Error())
+       }
+       req := codec.NewRPCRequest()
+       req.RpcHeader = &protocol.RpcConnHeader{
+               Flag: proto.Int32(0),
+       }
+       req.RequestHeader = &protocol.RequestHeader{
+               ServiceType: proto.Int32(AdminService),
+               ProtocolVer: proto.Int32(2),
+       }
+       req.RequestBody = &protocol.RequestBody{
+               Method:  proto.Int32(MasterConsumerRegister),
+               Request: data,
+               Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
+       }
+       rsp, err := c.client.DoRequest(c.ctx, req)
+       if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
+               if v.ResponseException != nil {
+                       return nil, errs.New(errs.RetResponseException, 
err.Error())
+               }
+               rspM2C := &protocol.RegisterResponseM2C{}
+               err := proto.Unmarshal(v.ResponseBody.Data, rspM2C)
+               if err != nil {
+                       return nil, errs.New(errs.RetUnMarshalFailure, 
err.Error())
+               }
+               return rspM2C, nil
+       }
+       return nil, errs.ErrAssertionFailure
+}
+
+// HeartRequestC2M implements the HeartRequestC2M interface according to 
TubeMQ RPC protocol.
+func (c *rpcClient) HeartRequestC2M(metadata *metadata.Metadata, sub 
*client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error) {
+       reqC2M := &protocol.HeartRequestC2M{
+               ClientId:            proto.String(sub.GetClientID()),
+               GroupName:           
proto.String(metadata.GetSubscribeInfo().GetGroup()),
+               DefFlowCheckId:      proto.Int64(r.GetDefFlowCtrlID()),
+               GroupFlowCheckId:    proto.Int64(r.GetGroupFlowCtrlID()),
+               ReportSubscribeInfo: proto.Bool(false),
+               QryPriorityId:       
proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+       }
+       event := r.PollEventResult()
+       if event != nil || metadata.GetReportTimes() {
+               reqC2M.ReportSubscribeInfo = proto.Bool(true)
+               subscribeInfo := r.GetSubscribeInfo()
+               if len(subscribeInfo) > 0 {
+                       reqC2M.SubscribeInfo = make([]string, 0, 
len(subscribeInfo))
+                       for _, s := range subscribeInfo {
+                               reqC2M.SubscribeInfo = 
append(reqC2M.SubscribeInfo, s.String())
+                       }
+               }
+       }
+       if event != nil {
+               ep := &protocol.EventProto{
+                       RebalanceId: proto.Int64(event.GetRebalanceID()),
+                       OpType:      proto.Int32(event.GetEventType()),
+                       Status:      proto.Int32(event.GetEventStatus()),
+               }
+               si := event.GetSubscribeInfo()
+               ep.SubscribeInfo = make([]string, 0, len(si))
+               for _, s := range si {
+                       ep.SubscribeInfo = append(ep.SubscribeInfo, s.String())
+               }
+       }
+       reqC2M.AuthInfo = &protocol.MasterCertificateInfo{
+               AuthInfo: &protocol.AuthenticateInfo{},
+       }
+       data, err := proto.Marshal(reqC2M)
+       if err != nil {
+               return nil, errs.New(errs.RetMarshalFailure, err.Error())
+       }
+       req := codec.NewRPCRequest()
+       req.RequestHeader = &protocol.RequestHeader{
+               ServiceType: proto.Int32(AdminService),
+               ProtocolVer: proto.Int32(2),
+       }
+       req.RequestBody = &protocol.RequestBody{
+               Method:  proto.Int32(MasterConsumerHeartbeat),
+               Request: data,
+               Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
+       }
+       req.RpcHeader = &protocol.RpcConnHeader{
+               Flag: proto.Int32(0),
+       }
+       rsp, err := c.client.DoRequest(c.ctx, req)
+       if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
+               if v.ResponseException != nil {
+                       return nil, errs.New(errs.RetResponseException, 
err.Error())
+               }
+               rspM2C := &protocol.HeartResponseM2C{}
+               err := proto.Unmarshal(v.ResponseBody.Data, rspM2C)
+               if err != nil {
+                       return nil, errs.New(errs.RetUnMarshalFailure, 
err.Error())
+               }
+               return rspM2C, nil
+       }
+       return nil, errs.ErrAssertionFailure
+}
+
+// CloseRequestC2M implements the CloseRequestC2M interface according to 
TubeMQ RPC protocol.
+func (c *rpcClient) CloseRequestC2M(metadata *metadata.Metadata, sub 
*client.SubInfo) (*protocol.CloseResponseM2C, error) {
+       reqC2M := &protocol.CloseRequestC2M{
+               ClientId:  proto.String(sub.GetClientID()),
+               GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()),
+               AuthInfo:  sub.GetMasterCertificateIInfo(),
+       }
+       data, err := proto.Marshal(reqC2M)
+       if err != nil {
+               return nil, errs.New(errs.RetMarshalFailure, err.Error())
+       }
+       req := codec.NewRPCRequest()
+       req.RequestHeader = &protocol.RequestHeader{
+               ServiceType: proto.Int32(AdminService),
+               ProtocolVer: proto.Int32(2),
+       }
+       req.RequestBody = &protocol.RequestBody{
+               Method:  proto.Int32(MasterConsumerClose),
+               Request: data,
+               Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
+       }
+       req.RpcHeader = &protocol.RpcConnHeader{
+               Flag: proto.Int32(0),
+       }
+       rsp, err := c.client.DoRequest(c.ctx, req)
+       if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
+               if v.ResponseException != nil {
+                       return nil, errs.New(errs.RetResponseException, 
err.Error())
+               }
+               rspM2C := &protocol.CloseResponseM2C{}
+               err := proto.Unmarshal(v.ResponseBody.Data, rspM2C)
+               if err != nil {
+                       return nil, errs.New(errs.RetUnMarshalFailure, 
err.Error())
+               }
+               return rspM2C, nil
+       }
+       return nil, errs.ErrAssertionFailure
+}

Reply via email to