This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch INLONG-25 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit ddfab69aa7ed27c9b3b46c49338a87030349f245 Author: Zijie Lu <[email protected]> AuthorDate: Mon May 24 16:34:56 2021 +0800 Pass context from client Signed-off-by: Zijie Lu <[email protected]> --- .../tubemq-client-go/client/remote.go | 16 +- tubemq-client-twins/tubemq-client-go/errs/errs.go | 15 +- tubemq-client-twins/tubemq-client-go/rpc/broker.go | 181 ++++++++++----------- tubemq-client-twins/tubemq-client-go/rpc/client.go | 52 ++++-- tubemq-client-twins/tubemq-client-go/rpc/master.go | 104 ++++++------ 5 files changed, 186 insertions(+), 182 deletions(-) diff --git a/tubemq-client-twins/tubemq-client-go/client/remote.go b/tubemq-client-twins/tubemq-client-go/client/remote.go index fd60425..e36b69f 100644 --- a/tubemq-client-twins/tubemq-client-go/client/remote.go +++ b/tubemq-client-twins/tubemq-client-go/client/remote.go @@ -27,14 +27,14 @@ import ( // RmtDataCache represents the data returned from TubeMQ. type RmtDataCache struct { - consumerID string - groupName string - underGroupCtrl bool - defFlowCtrlID int64 - groupFlowCtrlID int64 - subscribeInfo []*metadata.SubscribeInfo - rebalanceResults []*metadata.ConsumerEvent - mu sync.Mutex + consumerID string + groupName string + underGroupCtrl bool + defFlowCtrlID int64 + groupFlowCtrlID int64 + subscribeInfo []*metadata.SubscribeInfo + rebalanceResults []*metadata.ConsumerEvent + mu sync.Mutex brokerToPartitions map[*metadata.Node][]*metadata.Partition } diff --git a/tubemq-client-twins/tubemq-client-go/errs/errs.go b/tubemq-client-twins/tubemq-client-go/errs/errs.go index 637e381..0ef803d 100644 --- a/tubemq-client-twins/tubemq-client-go/errs/errs.go +++ b/tubemq-client-twins/tubemq-client-go/errs/errs.go @@ -23,12 +23,19 @@ import ( ) const ( - RetMarshalFailure = 1 + // RetMarshalFailure represents the error code of marshal error. + RetMarshalFailure = 1 + // RetResponseException represents the error code of response exception. RetResponseException = 2 - RetUnMarshalFailure = 3 - RetAssertionFailure = 4 + // RetUnMarshalFailure represents the error code of unmarshal error. + RetUnMarshalFailure = 3 + // RetAssertionFailure represents the error code of assertion error. + RetAssertionFailure = 4 + // RetRequestFailure represents the error code of request error. + RetRequestFailure = 5 ) +// ErrAssertionFailure represents RetAssertionFailure error. var ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure") // Error provides a TubeMQ-specific error container @@ -37,10 +44,12 @@ type Error struct { Msg string } +// Error() implements the Error interface. func (e *Error) Error() string { return fmt.Sprintf("code: %d, msg:%s", e.Code, e.Msg) } +// New returns a self-defined error. func New(code int32, msg string) error { err := &Error{ Code: code, diff --git a/tubemq-client-twins/tubemq-client-go/rpc/broker.go b/tubemq-client-twins/tubemq-client-go/rpc/broker.go index b2b614d..7f9ead2 100644 --- a/tubemq-client-twins/tubemq-client-go/rpc/broker.go +++ b/tubemq-client-twins/tubemq-client-go/rpc/broker.go @@ -30,26 +30,26 @@ import ( ) const ( - Register = 31 - Unregister = 32 + register = 31 + unregister = 32 ) const ( - BrokerProducerRegister = iota + 11 - BrokerProducerHeartbeat - BrokerProducerSendMsg - BrokerProducerClose - BrokerConsumerRegister - BrokerConsumerHeartbeat - BrokerConsumerGetMsg - BrokerConsumerCommit - BrokerConsumerClose + brokerProducerRegister = iota + 11 + brokerProducerHeartbeat + brokerProducerSendMsg + brokerProducerClose + brokerConsumerRegister + brokerConsumerHeartbeat + brokerConsumerGetMsg + brokerConsumerCommit + brokerConsumerClose ) // RegisterRequestC2B implements the RegisterRequestC2B interface according to TubeMQ RPC protocol. -func (c *rpcClient) RegisterRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) { +func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) { reqC2B := &protocol.RegisterRequestC2B{ - OpType: proto.Int32(Register), + OpType: proto.Int32(register), ClientId: proto.String(sub.GetClientID()), GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()), TopicName: proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()), @@ -78,35 +78,32 @@ func (c *rpcClient) RegisterRequestC2B(metadata *metadata.Metadata, sub *client. Flag: proto.Int32(0), } req.RequestHeader = &protocol.RequestHeader{ - ServiceType: proto.Int32(ReadService), + ServiceType: proto.Int32(brokerReadService), ProtocolVer: proto.Int32(2), } req.RequestBody = &protocol.RequestBody{ - Method: proto.Int32(BrokerConsumerRegister), + Method: proto.Int32(brokerConsumerRegister), Request: data, Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), } - ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) - defer cancel() - rsp, err := c.client.DoRequest(ctx, req) - if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { - if v.ResponseException != nil { - return nil, errs.New(errs.RetResponseException, v.ResponseException.String()) - } - rspC2B := &protocol.RegisterResponseB2C{} - err := proto.Unmarshal(v.ResponseBody.Data, rspC2B) - if err != nil { - return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) - } - return rspC2B, nil + + rspBody, err := c.doRequest(ctx, req) + if err != nil { + return nil, err } - return nil, errs.ErrAssertionFailure + + rspC2B := &protocol.RegisterResponseB2C{} + err = proto.Unmarshal(rspBody.Data, rspC2B) + if err != nil { + return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) + } + return rspC2B, nil } // UnregisterRequestC2B implements the UnregisterRequestC2B interface according to TubeMQ RPC protocol. -func (c *rpcClient) UnregisterRequestC2B(metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) { +func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) { reqC2B := &protocol.RegisterRequestC2B{ - OpType: proto.Int32(Unregister), + OpType: proto.Int32(unregister), ClientId: proto.String(sub.GetClientID()), GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()), TopicName: proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()), @@ -123,33 +120,30 @@ func (c *rpcClient) UnregisterRequestC2B(metadata metadata.Metadata, sub *client return nil, errs.New(errs.RetMarshalFailure, err.Error()) } req.RequestHeader = &protocol.RequestHeader{ - ServiceType: proto.Int32(ReadService), + ServiceType: proto.Int32(brokerReadService), ProtocolVer: proto.Int32(2), } req.RequestBody = &protocol.RequestBody{ - Method: proto.Int32(BrokerConsumerRegister), + Method: proto.Int32(brokerConsumerRegister), Request: data, Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), } - ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) - defer cancel() - rsp, err := c.client.DoRequest(ctx, req) - if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { - if v.ResponseException != nil { - return nil, errs.New(errs.RetResponseException, v.ResponseException.String()) - } - rspC2B := &protocol.RegisterResponseB2C{} - err := proto.Unmarshal(v.ResponseBody.Data, rspC2B) - if err != nil { - return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) - } - return rspC2B, nil + + rspBody, err := c.doRequest(ctx, req) + if err != nil { + return nil, err + } + + rspC2B := &protocol.RegisterResponseB2C{} + err = proto.Unmarshal(rspBody.Data, rspC2B) + if err != nil { + return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) } - return nil, errs.ErrAssertionFailure + return rspC2B, nil } // GetMessageRequestC2B implements the GetMessageRequestC2B interface according to TubeMQ RPC protocol. -func (c *rpcClient) GetMessageRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error) { +func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error) { reqC2B := &protocol.GetMessageRequestC2B{ ClientId: proto.String(sub.GetClientID()), PartitionId: proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()), @@ -168,33 +162,30 @@ func (c *rpcClient) GetMessageRequestC2B(metadata *metadata.Metadata, sub *clien return nil, errs.New(errs.RetMarshalFailure, err.Error()) } req.RequestHeader = &protocol.RequestHeader{ - ServiceType: proto.Int32(ReadService), + ServiceType: proto.Int32(brokerReadService), ProtocolVer: proto.Int32(2), } req.RequestBody = &protocol.RequestBody{ - Method: proto.Int32(BrokerConsumerGetMsg), + Method: proto.Int32(brokerConsumerGetMsg), Request: data, Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), } - ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) - defer cancel() - rsp, err := c.client.DoRequest(ctx, req) - if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { - if v.ResponseException != nil { - return nil, errs.New(errs.RetResponseException, v.ResponseException.String()) - } - rspC2B := &protocol.GetMessageResponseB2C{} - err := proto.Unmarshal(v.ResponseBody.Data, rspC2B) - if err != nil { - return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) - } - return rspC2B, nil + + rspBody, err := c.doRequest(ctx, req) + if err != nil { + return nil, err } - return nil, errs.ErrAssertionFailure + + rspC2B := &protocol.GetMessageResponseB2C{} + err = proto.Unmarshal(rspBody.Data, rspC2B) + if err != nil { + return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) + } + return rspC2B, nil } // CommitOffsetRequestC2B implements the CommitOffsetRequestC2B interface according to TubeMQ RPC protocol. -func (c *rpcClient) CommitOffsetRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error) { +func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error) { reqC2B := &protocol.CommitOffsetRequestC2B{ ClientId: proto.String(sub.GetClientID()), TopicName: proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()), @@ -211,33 +202,30 @@ func (c *rpcClient) CommitOffsetRequestC2B(metadata *metadata.Metadata, sub *cli return nil, errs.New(errs.RetMarshalFailure, err.Error()) } req.RequestHeader = &protocol.RequestHeader{ - ServiceType: proto.Int32(ReadService), + ServiceType: proto.Int32(brokerReadService), ProtocolVer: proto.Int32(2), } req.RequestBody = &protocol.RequestBody{ - Method: proto.Int32(BrokerConsumerHeartbeat), + Method: proto.Int32(brokerConsumerHeartbeat), Request: data, Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), } - ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) - defer cancel() - rsp, err := c.client.DoRequest(ctx, req) - if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { - if v.ResponseException != nil { - return nil, errs.New(errs.RetResponseException, v.ResponseException.String()) - } - rspC2B := &protocol.CommitOffsetResponseB2C{} - err := proto.Unmarshal(v.ResponseBody.Data, rspC2B) - if err != nil { - return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) - } - return rspC2B, nil + + rspBody, err := c.doRequest(ctx, req) + if err != nil { + return nil, err } - return nil, errs.ErrAssertionFailure + + rspC2B := &protocol.CommitOffsetResponseB2C{} + err = proto.Unmarshal(rspBody.Data, rspC2B) + if err != nil { + return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) + } + return rspC2B, nil } // HeartbeatRequestC2B implements the HeartbeatRequestC2B interface according to TubeMQ RPC protocol. -func (c *rpcClient) HeartbeatRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) { +func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) { reqC2B := &protocol.HeartBeatRequestC2B{ ClientId: proto.String(sub.GetClientID()), GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()), @@ -256,30 +244,27 @@ func (c *rpcClient) HeartbeatRequestC2B(metadata *metadata.Metadata, sub *client } req := codec.NewRPCRequest() req.RequestHeader = &protocol.RequestHeader{ - ServiceType: proto.Int32(ReadService), + ServiceType: proto.Int32(brokerReadService), ProtocolVer: proto.Int32(2), } req.RequestBody = &protocol.RequestBody{ - Method: proto.Int32(BrokerConsumerHeartbeat), + Method: proto.Int32(brokerConsumerHeartbeat), Request: data, Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), } req.RpcHeader = &protocol.RpcConnHeader{ Flag: proto.Int32(0), } - ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) - defer cancel() - rsp, err := c.client.DoRequest(ctx, req) - if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { - if v.ResponseException != nil { - return nil, errs.New(errs.RetResponseException, v.ResponseException.String()) - } - rspC2B := &protocol.HeartBeatResponseB2C{} - err := proto.Unmarshal(v.ResponseBody.Data, rspC2B) - if err != nil { - return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) - } - return rspC2B, nil + + rspBody, err := c.doRequest(ctx, req) + if err != nil { + return nil, err + } + + rspC2B := &protocol.HeartBeatResponseB2C{} + err = proto.Unmarshal(rspBody.Data, rspC2B) + if err != nil { + return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) } - return nil, errs.ErrAssertionFailure + return rspC2B, nil } diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go b/tubemq-client-twins/tubemq-client-go/rpc/client.go index 46961cd..71123c7 100644 --- a/tubemq-client-twins/tubemq-client-go/rpc/client.go +++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go @@ -19,8 +19,12 @@ package rpc import ( + "context" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config" + "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol" @@ -28,28 +32,36 @@ import ( ) const ( - ReadService = 2 - AdminService = 4 + brokerReadService = 2 + masterService = 4 ) // RPCClient is the rpc level client to interact with TubeMQ. type RPCClient interface { // RegisterRequestC2B is the rpc request for a consumer to register to a broker. - RegisterRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) + RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) // UnregisterRequestC2B is the rpc request for a consumer to unregister to a broker. - UnregisterRequestC2B(metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) + UnregisterRequestC2B(ctx context.Context, metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) // GetMessageRequestC2B is the rpc request for a consumer to get message from a broker. - GetMessageRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error) + GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error) // CommitOffsetRequestC2B is the rpc request for a consumer to commit offset to a broker. - CommitOffsetRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error) + CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error) // HeartbeatRequestC2B is the rpc request for a consumer to send heartbeat to a broker. - HeartbeatRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) + HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) // RegisterRequestC2M is the rpc request for a consumer to register request to master. - RegisterRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error) + RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error) // HeartRequestC2M is the rpc request for a consumer to send heartbeat to master. - HeartRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error) + HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error) // CloseRequestC2M is the rpc request for a consumer to be closed to master. - CloseRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error) + CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error) +} + +// New returns a default TubeMQ rpc Client +func New(pool *multiplexing.Pool, opts *transport.Options) RPCClient { + return &rpcClient{ + pool: pool, + client: transport.New(opts, pool), + } } type rpcClient struct { @@ -58,11 +70,19 @@ type rpcClient struct { config *config.Config } -// New returns a default TubeMQ rpc Client -func New(pool *multiplexing.Pool, opts *transport.Options, config *config.Config) RPCClient { - return &rpcClient{ - pool: pool, - client: transport.New(opts, pool), - config: config, +func (c *rpcClient) doRequest(ctx context.Context, req codec.RPCRequest) (*protocol.RspResponseBody, error) { + rsp, err := c.client.DoRequest(ctx, req) + if err != nil { + return nil, errs.New(errs.RetRequestFailure, err.Error()) + } + + if _, ok := rsp.(*codec.TubeMQRPCResponse); !ok { + return nil, errs.ErrAssertionFailure + } + + v := rsp.(*codec.TubeMQRPCResponse) + if v.ResponseException != nil { + return nil, errs.New(errs.RetResponseException, v.ResponseException.String()) } + return v.ResponseBody, nil } diff --git a/tubemq-client-twins/tubemq-client-go/rpc/master.go b/tubemq-client-twins/tubemq-client-go/rpc/master.go index 38e2b68..9eb336a 100644 --- a/tubemq-client-twins/tubemq-client-go/rpc/master.go +++ b/tubemq-client-twins/tubemq-client-go/rpc/master.go @@ -30,16 +30,16 @@ import ( ) const ( - MasterProducerRegister = iota + 1 - MasterProducerHeartbeat - MasterProducerClose - MasterConsumerRegister - MasterConsumerHeartbeat - MasterConsumerClose + masterProducerRegister = iota + 1 + masterProducerHeartbeat + masterProducerClose + masterConsumerRegister + masterConsumerHeartbeat + masterConsumerClose ) // RegisterRequestRequestC2M implements the RegisterRequestRequestC2M interface according to TubeMQ RPC protocol. -func (c *rpcClient) RegisterRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error) { +func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error) { reqC2M := &protocol.RegisterRequestC2M{ ClientId: proto.String(sub.GetClientID()), HostName: proto.String(metadata.GetNode().GetHost()), @@ -83,34 +83,30 @@ func (c *rpcClient) RegisterRequestC2M(metadata *metadata.Metadata, sub *client. Flag: proto.Int32(0), } req.RequestHeader = &protocol.RequestHeader{ - ServiceType: proto.Int32(AdminService), + ServiceType: proto.Int32(masterService), ProtocolVer: proto.Int32(2), } req.RequestBody = &protocol.RequestBody{ - Method: proto.Int32(MasterConsumerRegister), + Method: proto.Int32(masterConsumerRegister), Request: data, Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), } - ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) - defer cancel() - rsp, err := c.client.DoRequest(ctx, req) - if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { - if v.ResponseException != nil { - return nil, errs.New(errs.RetResponseException, err.Error()) - } - rspM2C := &protocol.RegisterResponseM2C{} - err := proto.Unmarshal(v.ResponseBody.Data, rspM2C) - if err != nil { - return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) - } - return rspM2C, nil + rspBody, err := c.doRequest(ctx, req) + if err != nil { + return nil, err + } + + rspM2C := &protocol.RegisterResponseM2C{} + err = proto.Unmarshal(rspBody.Data, rspM2C) + if err != nil { + return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) } - return nil, errs.ErrAssertionFailure + return rspM2C, nil } // HeartRequestC2M implements the HeartRequestC2M interface according to TubeMQ RPC protocol. -func (c *rpcClient) HeartRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error) { +func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error) { reqC2M := &protocol.HeartRequestC2M{ ClientId: proto.String(sub.GetClientID()), GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()), @@ -151,36 +147,33 @@ func (c *rpcClient) HeartRequestC2M(metadata *metadata.Metadata, sub *client.Sub } req := codec.NewRPCRequest() req.RequestHeader = &protocol.RequestHeader{ - ServiceType: proto.Int32(AdminService), + ServiceType: proto.Int32(masterService), ProtocolVer: proto.Int32(2), } req.RequestBody = &protocol.RequestBody{ - Method: proto.Int32(MasterConsumerHeartbeat), + Method: proto.Int32(masterConsumerHeartbeat), Request: data, Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), } req.RpcHeader = &protocol.RpcConnHeader{ Flag: proto.Int32(0), } - ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) - defer cancel() - rsp, err := c.client.DoRequest(ctx, req) - if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { - if v.ResponseException != nil { - return nil, errs.New(errs.RetResponseException, err.Error()) - } - rspM2C := &protocol.HeartResponseM2C{} - err := proto.Unmarshal(v.ResponseBody.Data, rspM2C) - if err != nil { - return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) - } - return rspM2C, nil + + rspBody, err := c.doRequest(ctx, req) + if err != nil { + return nil, err } - return nil, errs.ErrAssertionFailure + + rspM2C := &protocol.HeartResponseM2C{} + err = proto.Unmarshal(rspBody.Data, rspM2C) + if err != nil { + return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) + } + return rspM2C, nil } // CloseRequestC2M implements the CloseRequestC2M interface according to TubeMQ RPC protocol. -func (c *rpcClient) CloseRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error) { +func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error) { reqC2M := &protocol.CloseRequestC2M{ ClientId: proto.String(sub.GetClientID()), GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()), @@ -192,30 +185,27 @@ func (c *rpcClient) CloseRequestC2M(metadata *metadata.Metadata, sub *client.Sub } req := codec.NewRPCRequest() req.RequestHeader = &protocol.RequestHeader{ - ServiceType: proto.Int32(AdminService), + ServiceType: proto.Int32(masterService), ProtocolVer: proto.Int32(2), } req.RequestBody = &protocol.RequestBody{ - Method: proto.Int32(MasterConsumerClose), + Method: proto.Int32(masterConsumerClose), Request: data, Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), } req.RpcHeader = &protocol.RpcConnHeader{ Flag: proto.Int32(0), } - ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) - defer cancel() - rsp, err := c.client.DoRequest(ctx, req) - if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { - if v.ResponseException != nil { - return nil, errs.New(errs.RetResponseException, err.Error()) - } - rspM2C := &protocol.CloseResponseM2C{} - err := proto.Unmarshal(v.ResponseBody.Data, rspM2C) - if err != nil { - return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) - } - return rspM2C, nil + + rspBody, err := c.doRequest(ctx, req) + if err != nil { + return nil, err + } + + rspM2C := &protocol.CloseResponseM2C{} + err = proto.Unmarshal(rspBody.Data, rspM2C) + if err != nil { + return nil, errs.New(errs.RetUnMarshalFailure, err.Error()) } - return nil, errs.ErrAssertionFailure + return rspM2C, nil }
