This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit ddfab69aa7ed27c9b3b46c49338a87030349f245
Author: Zijie Lu <[email protected]>
AuthorDate: Mon May 24 16:34:56 2021 +0800

    Pass context from client
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 .../tubemq-client-go/client/remote.go              |  16 +-
 tubemq-client-twins/tubemq-client-go/errs/errs.go  |  15 +-
 tubemq-client-twins/tubemq-client-go/rpc/broker.go | 181 ++++++++++-----------
 tubemq-client-twins/tubemq-client-go/rpc/client.go |  52 ++++--
 tubemq-client-twins/tubemq-client-go/rpc/master.go | 104 ++++++------
 5 files changed, 186 insertions(+), 182 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/remote.go 
b/tubemq-client-twins/tubemq-client-go/client/remote.go
index fd60425..e36b69f 100644
--- a/tubemq-client-twins/tubemq-client-go/client/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/client/remote.go
@@ -27,14 +27,14 @@ import (
 
 // RmtDataCache represents the data returned from TubeMQ.
 type RmtDataCache struct {
-       consumerID       string
-       groupName        string
-       underGroupCtrl   bool
-       defFlowCtrlID    int64
-       groupFlowCtrlID  int64
-       subscribeInfo    []*metadata.SubscribeInfo
-       rebalanceResults []*metadata.ConsumerEvent
-       mu               sync.Mutex
+       consumerID         string
+       groupName          string
+       underGroupCtrl     bool
+       defFlowCtrlID      int64
+       groupFlowCtrlID    int64
+       subscribeInfo      []*metadata.SubscribeInfo
+       rebalanceResults   []*metadata.ConsumerEvent
+       mu                 sync.Mutex
        brokerToPartitions map[*metadata.Node][]*metadata.Partition
 }
 
diff --git a/tubemq-client-twins/tubemq-client-go/errs/errs.go 
b/tubemq-client-twins/tubemq-client-go/errs/errs.go
index 637e381..0ef803d 100644
--- a/tubemq-client-twins/tubemq-client-go/errs/errs.go
+++ b/tubemq-client-twins/tubemq-client-go/errs/errs.go
@@ -23,12 +23,19 @@ import (
 )
 
 const (
-       RetMarshalFailure    = 1
+       // RetMarshalFailure represents the error code of marshal error.
+       RetMarshalFailure = 1
+       // RetResponseException represents the error code of response exception.
        RetResponseException = 2
-       RetUnMarshalFailure  = 3
-       RetAssertionFailure  = 4
+       // RetUnMarshalFailure represents the error code of unmarshal error.
+       RetUnMarshalFailure = 3
+       // RetAssertionFailure represents the error code of assertion error.
+       RetAssertionFailure = 4
+       // RetRequestFailure represents the error code of request error.
+       RetRequestFailure = 5
 )
 
+// ErrAssertionFailure represents RetAssertionFailure error.
 var ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure")
 
 // Error provides a TubeMQ-specific error container
@@ -37,10 +44,12 @@ type Error struct {
        Msg  string
 }
 
+// Error() implements the Error interface.
 func (e *Error) Error() string {
        return fmt.Sprintf("code: %d, msg:%s", e.Code, e.Msg)
 }
 
+// New returns a self-defined error.
 func New(code int32, msg string) error {
        err := &Error{
                Code: code,
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/broker.go 
b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
index b2b614d..7f9ead2 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/broker.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
@@ -30,26 +30,26 @@ import (
 )
 
 const (
-       Register   = 31
-       Unregister = 32
+       register   = 31
+       unregister = 32
 )
 
 const (
-       BrokerProducerRegister = iota + 11
-       BrokerProducerHeartbeat
-       BrokerProducerSendMsg
-       BrokerProducerClose
-       BrokerConsumerRegister
-       BrokerConsumerHeartbeat
-       BrokerConsumerGetMsg
-       BrokerConsumerCommit
-       BrokerConsumerClose
+       brokerProducerRegister = iota + 11
+       brokerProducerHeartbeat
+       brokerProducerSendMsg
+       brokerProducerClose
+       brokerConsumerRegister
+       brokerConsumerHeartbeat
+       brokerConsumerGetMsg
+       brokerConsumerCommit
+       brokerConsumerClose
 )
 
 // RegisterRequestC2B implements the RegisterRequestC2B interface according to 
TubeMQ RPC protocol.
-func (c *rpcClient) RegisterRequestC2B(metadata *metadata.Metadata, sub 
*client.SubInfo) (*protocol.RegisterResponseB2C, error) {
+func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata 
*metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) 
{
        reqC2B := &protocol.RegisterRequestC2B{
-               OpType:        proto.Int32(Register),
+               OpType:        proto.Int32(register),
                ClientId:      proto.String(sub.GetClientID()),
                GroupName:     
proto.String(metadata.GetSubscribeInfo().GetGroup()),
                TopicName:     
proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
@@ -78,35 +78,32 @@ func (c *rpcClient) RegisterRequestC2B(metadata 
*metadata.Metadata, sub *client.
                Flag: proto.Int32(0),
        }
        req.RequestHeader = &protocol.RequestHeader{
-               ServiceType: proto.Int32(ReadService),
+               ServiceType: proto.Int32(brokerReadService),
                ProtocolVer: proto.Int32(2),
        }
        req.RequestBody = &protocol.RequestBody{
-               Method:  proto.Int32(BrokerConsumerRegister),
+               Method:  proto.Int32(brokerConsumerRegister),
                Request: data,
                Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
        }
-       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
-       defer cancel()
-       rsp, err := c.client.DoRequest(ctx, req)
-       if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
-               if v.ResponseException != nil {
-                       return nil, errs.New(errs.RetResponseException, 
v.ResponseException.String())
-               }
-               rspC2B := &protocol.RegisterResponseB2C{}
-               err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
-               if err != nil {
-                       return nil, errs.New(errs.RetUnMarshalFailure, 
err.Error())
-               }
-               return rspC2B, nil
+
+       rspBody, err := c.doRequest(ctx, req)
+       if err != nil {
+               return nil, err
        }
-       return nil, errs.ErrAssertionFailure
+
+       rspC2B := &protocol.RegisterResponseB2C{}
+       err = proto.Unmarshal(rspBody.Data, rspC2B)
+       if err != nil {
+               return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
+       }
+       return rspC2B, nil
 }
 
 // UnregisterRequestC2B implements the UnregisterRequestC2B interface 
according to TubeMQ RPC protocol.
-func (c *rpcClient) UnregisterRequestC2B(metadata metadata.Metadata, sub 
*client.SubInfo) (*protocol.RegisterResponseB2C, error) {
+func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata 
metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) {
        reqC2B := &protocol.RegisterRequestC2B{
-               OpType:      proto.Int32(Unregister),
+               OpType:      proto.Int32(unregister),
                ClientId:    proto.String(sub.GetClientID()),
                GroupName:   
proto.String(metadata.GetSubscribeInfo().GetGroup()),
                TopicName:   
proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
@@ -123,33 +120,30 @@ func (c *rpcClient) UnregisterRequestC2B(metadata 
metadata.Metadata, sub *client
                return nil, errs.New(errs.RetMarshalFailure, err.Error())
        }
        req.RequestHeader = &protocol.RequestHeader{
-               ServiceType: proto.Int32(ReadService),
+               ServiceType: proto.Int32(brokerReadService),
                ProtocolVer: proto.Int32(2),
        }
        req.RequestBody = &protocol.RequestBody{
-               Method:  proto.Int32(BrokerConsumerRegister),
+               Method:  proto.Int32(brokerConsumerRegister),
                Request: data,
                Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
        }
-       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
-       defer cancel()
-       rsp, err := c.client.DoRequest(ctx, req)
-       if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
-               if v.ResponseException != nil {
-                       return nil, errs.New(errs.RetResponseException, 
v.ResponseException.String())
-               }
-               rspC2B := &protocol.RegisterResponseB2C{}
-               err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
-               if err != nil {
-                       return nil, errs.New(errs.RetUnMarshalFailure, 
err.Error())
-               }
-               return rspC2B, nil
+
+       rspBody, err := c.doRequest(ctx, req)
+       if err != nil {
+               return nil, err
+       }
+
+       rspC2B := &protocol.RegisterResponseB2C{}
+       err = proto.Unmarshal(rspBody.Data, rspC2B)
+       if err != nil {
+               return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
        }
-       return nil, errs.ErrAssertionFailure
+       return rspC2B, nil
 }
 
 // GetMessageRequestC2B implements the GetMessageRequestC2B interface 
according to TubeMQ RPC protocol.
-func (c *rpcClient) GetMessageRequestC2B(metadata *metadata.Metadata, sub 
*client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, 
error) {
+func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata 
*metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) 
(*protocol.GetMessageResponseB2C, error) {
        reqC2B := &protocol.GetMessageRequestC2B{
                ClientId:           proto.String(sub.GetClientID()),
                PartitionId:        
proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()),
@@ -168,33 +162,30 @@ func (c *rpcClient) GetMessageRequestC2B(metadata 
*metadata.Metadata, sub *clien
                return nil, errs.New(errs.RetMarshalFailure, err.Error())
        }
        req.RequestHeader = &protocol.RequestHeader{
-               ServiceType: proto.Int32(ReadService),
+               ServiceType: proto.Int32(brokerReadService),
                ProtocolVer: proto.Int32(2),
        }
        req.RequestBody = &protocol.RequestBody{
-               Method:  proto.Int32(BrokerConsumerGetMsg),
+               Method:  proto.Int32(brokerConsumerGetMsg),
                Request: data,
                Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
        }
-       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
-       defer cancel()
-       rsp, err := c.client.DoRequest(ctx, req)
-       if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
-               if v.ResponseException != nil {
-                       return nil, errs.New(errs.RetResponseException, 
v.ResponseException.String())
-               }
-               rspC2B := &protocol.GetMessageResponseB2C{}
-               err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
-               if err != nil {
-                       return nil, errs.New(errs.RetUnMarshalFailure, 
err.Error())
-               }
-               return rspC2B, nil
+
+       rspBody, err := c.doRequest(ctx, req)
+       if err != nil {
+               return nil, err
        }
-       return nil, errs.ErrAssertionFailure
+
+       rspC2B := &protocol.GetMessageResponseB2C{}
+       err = proto.Unmarshal(rspBody.Data, rspC2B)
+       if err != nil {
+               return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
+       }
+       return rspC2B, nil
 }
 
 // CommitOffsetRequestC2B implements the CommitOffsetRequestC2B interface 
according to TubeMQ RPC protocol.
-func (c *rpcClient) CommitOffsetRequestC2B(metadata *metadata.Metadata, sub 
*client.SubInfo) (*protocol.CommitOffsetResponseB2C, error) {
+func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata 
*metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, 
error) {
        reqC2B := &protocol.CommitOffsetRequestC2B{
                ClientId:         proto.String(sub.GetClientID()),
                TopicName:        
proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
@@ -211,33 +202,30 @@ func (c *rpcClient) CommitOffsetRequestC2B(metadata 
*metadata.Metadata, sub *cli
                return nil, errs.New(errs.RetMarshalFailure, err.Error())
        }
        req.RequestHeader = &protocol.RequestHeader{
-               ServiceType: proto.Int32(ReadService),
+               ServiceType: proto.Int32(brokerReadService),
                ProtocolVer: proto.Int32(2),
        }
        req.RequestBody = &protocol.RequestBody{
-               Method:  proto.Int32(BrokerConsumerHeartbeat),
+               Method:  proto.Int32(brokerConsumerHeartbeat),
                Request: data,
                Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
        }
-       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
-       defer cancel()
-       rsp, err := c.client.DoRequest(ctx, req)
-       if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
-               if v.ResponseException != nil {
-                       return nil, errs.New(errs.RetResponseException, 
v.ResponseException.String())
-               }
-               rspC2B := &protocol.CommitOffsetResponseB2C{}
-               err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
-               if err != nil {
-                       return nil, errs.New(errs.RetUnMarshalFailure, 
err.Error())
-               }
-               return rspC2B, nil
+
+       rspBody, err := c.doRequest(ctx, req)
+       if err != nil {
+               return nil, err
        }
-       return nil, errs.ErrAssertionFailure
+
+       rspC2B := &protocol.CommitOffsetResponseB2C{}
+       err = proto.Unmarshal(rspBody.Data, rspC2B)
+       if err != nil {
+               return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
+       }
+       return rspC2B, nil
 }
 
 // HeartbeatRequestC2B implements the HeartbeatRequestC2B interface according 
to TubeMQ RPC protocol.
-func (c *rpcClient) HeartbeatRequestC2B(metadata *metadata.Metadata, sub 
*client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, 
error) {
+func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata 
*metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) 
(*protocol.HeartBeatResponseB2C, error) {
        reqC2B := &protocol.HeartBeatRequestC2B{
                ClientId:      proto.String(sub.GetClientID()),
                GroupName:     
proto.String(metadata.GetSubscribeInfo().GetGroup()),
@@ -256,30 +244,27 @@ func (c *rpcClient) HeartbeatRequestC2B(metadata 
*metadata.Metadata, sub *client
        }
        req := codec.NewRPCRequest()
        req.RequestHeader = &protocol.RequestHeader{
-               ServiceType: proto.Int32(ReadService),
+               ServiceType: proto.Int32(brokerReadService),
                ProtocolVer: proto.Int32(2),
        }
        req.RequestBody = &protocol.RequestBody{
-               Method:  proto.Int32(BrokerConsumerHeartbeat),
+               Method:  proto.Int32(brokerConsumerHeartbeat),
                Request: data,
                Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
        }
        req.RpcHeader = &protocol.RpcConnHeader{
                Flag: proto.Int32(0),
        }
-       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
-       defer cancel()
-       rsp, err := c.client.DoRequest(ctx, req)
-       if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
-               if v.ResponseException != nil {
-                       return nil, errs.New(errs.RetResponseException, 
v.ResponseException.String())
-               }
-               rspC2B := &protocol.HeartBeatResponseB2C{}
-               err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
-               if err != nil {
-                       return nil, errs.New(errs.RetUnMarshalFailure, 
err.Error())
-               }
-               return rspC2B, nil
+
+       rspBody, err := c.doRequest(ctx, req)
+       if err != nil {
+               return nil, err
+       }
+
+       rspC2B := &protocol.HeartBeatResponseB2C{}
+       err = proto.Unmarshal(rspBody.Data, rspC2B)
+       if err != nil {
+               return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
        }
-       return nil, errs.ErrAssertionFailure
+       return rspC2B, nil
 }
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go 
b/tubemq-client-twins/tubemq-client-go/rpc/client.go
index 46961cd..71123c7 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/client.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -19,8 +19,12 @@
 package rpc
 
 import (
+       "context"
+
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
@@ -28,28 +32,36 @@ import (
 )
 
 const (
-       ReadService = 2
-       AdminService = 4
+       brokerReadService = 2
+       masterService     = 4
 )
 
 // RPCClient is the rpc level client to interact with TubeMQ.
 type RPCClient interface {
        // RegisterRequestC2B is the rpc request for a consumer to register to 
a broker.
-       RegisterRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) 
(*protocol.RegisterResponseB2C, error)
+       RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, 
sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
        // UnregisterRequestC2B is the rpc request for a consumer to unregister 
to a broker.
-       UnregisterRequestC2B(metadata metadata.Metadata, sub *client.SubInfo) 
(*protocol.RegisterResponseB2C, error)
+       UnregisterRequestC2B(ctx context.Context, metadata metadata.Metadata, 
sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
        // GetMessageRequestC2B is the rpc request for a consumer to get 
message from a broker.
-       GetMessageRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, 
r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error)
+       GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, 
sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, 
error)
        // CommitOffsetRequestC2B is the rpc request for a consumer to commit 
offset to a broker.
-       CommitOffsetRequestC2B(metadata *metadata.Metadata, sub 
*client.SubInfo) (*protocol.CommitOffsetResponseB2C, error)
+       CommitOffsetRequestC2B(ctx context.Context, metadata 
*metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, 
error)
        // HeartbeatRequestC2B is the rpc request for a consumer to send 
heartbeat to a broker.
-       HeartbeatRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r 
*client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error)
+       HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, 
sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, 
error)
        // RegisterRequestC2M is the rpc request for a consumer to register 
request to master.
-       RegisterRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r 
*client.RmtDataCache) (*protocol.RegisterResponseM2C, error)
+       RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, 
sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, 
error)
        // HeartRequestC2M is the rpc request for a consumer to send heartbeat 
to master.
-       HeartRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r 
*client.RmtDataCache) (*protocol.HeartResponseM2C, error)
+       HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub 
*client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error)
        // CloseRequestC2M is the rpc request for a consumer to be closed to 
master.
-       CloseRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo) 
(*protocol.CloseResponseM2C, error)
+       CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub 
*client.SubInfo) (*protocol.CloseResponseM2C, error)
+}
+
+// New returns a default TubeMQ rpc Client
+func New(pool *multiplexing.Pool, opts *transport.Options) RPCClient {
+       return &rpcClient{
+               pool:   pool,
+               client: transport.New(opts, pool),
+       }
 }
 
 type rpcClient struct {
@@ -58,11 +70,19 @@ type rpcClient struct {
        config *config.Config
 }
 
-// New returns a default TubeMQ rpc Client
-func New(pool *multiplexing.Pool, opts *transport.Options, config 
*config.Config) RPCClient {
-       return &rpcClient{
-               pool:   pool,
-               client: transport.New(opts, pool),
-               config: config,
+func (c *rpcClient) doRequest(ctx context.Context, req codec.RPCRequest) 
(*protocol.RspResponseBody, error) {
+       rsp, err := c.client.DoRequest(ctx, req)
+       if err != nil {
+               return nil, errs.New(errs.RetRequestFailure, err.Error())
+       }
+
+       if _, ok := rsp.(*codec.TubeMQRPCResponse); !ok {
+               return nil, errs.ErrAssertionFailure
+       }
+
+       v := rsp.(*codec.TubeMQRPCResponse)
+       if v.ResponseException != nil {
+               return nil, errs.New(errs.RetResponseException, 
v.ResponseException.String())
        }
+       return v.ResponseBody, nil
 }
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/master.go 
b/tubemq-client-twins/tubemq-client-go/rpc/master.go
index 38e2b68..9eb336a 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/master.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/master.go
@@ -30,16 +30,16 @@ import (
 )
 
 const (
-       MasterProducerRegister = iota + 1
-       MasterProducerHeartbeat
-       MasterProducerClose
-       MasterConsumerRegister
-       MasterConsumerHeartbeat
-       MasterConsumerClose
+       masterProducerRegister = iota + 1
+       masterProducerHeartbeat
+       masterProducerClose
+       masterConsumerRegister
+       masterConsumerHeartbeat
+       masterConsumerClose
 )
 
 // RegisterRequestRequestC2M implements the RegisterRequestRequestC2M 
interface according to TubeMQ RPC protocol.
-func (c *rpcClient) RegisterRequestC2M(metadata *metadata.Metadata, sub 
*client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error) 
{
+func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata 
*metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) 
(*protocol.RegisterResponseM2C, error) {
        reqC2M := &protocol.RegisterRequestC2M{
                ClientId:         proto.String(sub.GetClientID()),
                HostName:         proto.String(metadata.GetNode().GetHost()),
@@ -83,34 +83,30 @@ func (c *rpcClient) RegisterRequestC2M(metadata 
*metadata.Metadata, sub *client.
                Flag: proto.Int32(0),
        }
        req.RequestHeader = &protocol.RequestHeader{
-               ServiceType: proto.Int32(AdminService),
+               ServiceType: proto.Int32(masterService),
                ProtocolVer: proto.Int32(2),
        }
        req.RequestBody = &protocol.RequestBody{
-               Method:  proto.Int32(MasterConsumerRegister),
+               Method:  proto.Int32(masterConsumerRegister),
                Request: data,
                Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
        }
 
-       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
-       defer cancel()
-       rsp, err := c.client.DoRequest(ctx, req)
-       if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
-               if v.ResponseException != nil {
-                       return nil, errs.New(errs.RetResponseException, 
err.Error())
-               }
-               rspM2C := &protocol.RegisterResponseM2C{}
-               err := proto.Unmarshal(v.ResponseBody.Data, rspM2C)
-               if err != nil {
-                       return nil, errs.New(errs.RetUnMarshalFailure, 
err.Error())
-               }
-               return rspM2C, nil
+       rspBody, err := c.doRequest(ctx, req)
+       if err != nil {
+               return nil, err
+       }
+
+       rspM2C := &protocol.RegisterResponseM2C{}
+       err = proto.Unmarshal(rspBody.Data, rspM2C)
+       if err != nil {
+               return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
        }
-       return nil, errs.ErrAssertionFailure
+       return rspM2C, nil
 }
 
 // HeartRequestC2M implements the HeartRequestC2M interface according to 
TubeMQ RPC protocol.
-func (c *rpcClient) HeartRequestC2M(metadata *metadata.Metadata, sub 
*client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error) {
+func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata 
*metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) 
(*protocol.HeartResponseM2C, error) {
        reqC2M := &protocol.HeartRequestC2M{
                ClientId:            proto.String(sub.GetClientID()),
                GroupName:           
proto.String(metadata.GetSubscribeInfo().GetGroup()),
@@ -151,36 +147,33 @@ func (c *rpcClient) HeartRequestC2M(metadata 
*metadata.Metadata, sub *client.Sub
        }
        req := codec.NewRPCRequest()
        req.RequestHeader = &protocol.RequestHeader{
-               ServiceType: proto.Int32(AdminService),
+               ServiceType: proto.Int32(masterService),
                ProtocolVer: proto.Int32(2),
        }
        req.RequestBody = &protocol.RequestBody{
-               Method:  proto.Int32(MasterConsumerHeartbeat),
+               Method:  proto.Int32(masterConsumerHeartbeat),
                Request: data,
                Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
        }
        req.RpcHeader = &protocol.RpcConnHeader{
                Flag: proto.Int32(0),
        }
-       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
-       defer cancel()
-       rsp, err := c.client.DoRequest(ctx, req)
-       if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
-               if v.ResponseException != nil {
-                       return nil, errs.New(errs.RetResponseException, 
err.Error())
-               }
-               rspM2C := &protocol.HeartResponseM2C{}
-               err := proto.Unmarshal(v.ResponseBody.Data, rspM2C)
-               if err != nil {
-                       return nil, errs.New(errs.RetUnMarshalFailure, 
err.Error())
-               }
-               return rspM2C, nil
+
+       rspBody, err := c.doRequest(ctx, req)
+       if err != nil {
+               return nil, err
        }
-       return nil, errs.ErrAssertionFailure
+
+       rspM2C := &protocol.HeartResponseM2C{}
+       err = proto.Unmarshal(rspBody.Data, rspM2C)
+       if err != nil {
+               return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
+       }
+       return rspM2C, nil
 }
 
 // CloseRequestC2M implements the CloseRequestC2M interface according to 
TubeMQ RPC protocol.
-func (c *rpcClient) CloseRequestC2M(metadata *metadata.Metadata, sub 
*client.SubInfo) (*protocol.CloseResponseM2C, error) {
+func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata 
*metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error) {
        reqC2M := &protocol.CloseRequestC2M{
                ClientId:  proto.String(sub.GetClientID()),
                GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()),
@@ -192,30 +185,27 @@ func (c *rpcClient) CloseRequestC2M(metadata 
*metadata.Metadata, sub *client.Sub
        }
        req := codec.NewRPCRequest()
        req.RequestHeader = &protocol.RequestHeader{
-               ServiceType: proto.Int32(AdminService),
+               ServiceType: proto.Int32(masterService),
                ProtocolVer: proto.Int32(2),
        }
        req.RequestBody = &protocol.RequestBody{
-               Method:  proto.Int32(MasterConsumerClose),
+               Method:  proto.Int32(masterConsumerClose),
                Request: data,
                Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
        }
        req.RpcHeader = &protocol.RpcConnHeader{
                Flag: proto.Int32(0),
        }
-       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
-       defer cancel()
-       rsp, err := c.client.DoRequest(ctx, req)
-       if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
-               if v.ResponseException != nil {
-                       return nil, errs.New(errs.RetResponseException, 
err.Error())
-               }
-               rspM2C := &protocol.CloseResponseM2C{}
-               err := proto.Unmarshal(v.ResponseBody.Data, rspM2C)
-               if err != nil {
-                       return nil, errs.New(errs.RetUnMarshalFailure, 
err.Error())
-               }
-               return rspM2C, nil
+
+       rspBody, err := c.doRequest(ctx, req)
+       if err != nil {
+               return nil, err
+       }
+
+       rspM2C := &protocol.CloseResponseM2C{}
+       err = proto.Unmarshal(rspBody.Data, rspM2C)
+       if err != nil {
+               return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
        }
-       return nil, errs.ErrAssertionFailure
+       return rspM2C, nil
 }

Reply via email to