This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch INLONG-25 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 501b6be59173072d4cfe27c1d97bc169af9feef9 Author: Zijie Lu <[email protected]> AuthorDate: Mon May 24 14:31:02 2021 +0800 Add license and construct context in each rpc request. Signed-off-by: Zijie Lu <[email protected]> --- .../tubemq-client-go/config/config.go | 20 ++++++++++++++++++++ tubemq-client-twins/tubemq-client-go/rpc/broker.go | 22 +++++++++++++++++----- tubemq-client-twins/tubemq-client-go/rpc/client.go | 6 +----- tubemq-client-twins/tubemq-client-go/rpc/master.go | 15 ++++++++++++--- 4 files changed, 50 insertions(+), 13 deletions(-) diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go b/tubemq-client-twins/tubemq-client-go/config/config.go index 999d87a..f290f07 100644 --- a/tubemq-client-twins/tubemq-client-go/config/config.go +++ b/tubemq-client-twins/tubemq-client-go/config/config.go @@ -1,10 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// package config defines the all the TubeMQ configuration options. package config import ( "time" ) +// Config defines multiple configuration options. type Config struct { + // Net iis the namespace for network-level properties used by Broker and Master. Net struct { // How long to wait for a response. ReadTimeout time.Duration diff --git a/tubemq-client-twins/tubemq-client-go/rpc/broker.go b/tubemq-client-twins/tubemq-client-go/rpc/broker.go index aa7b831..b2b614d 100644 --- a/tubemq-client-twins/tubemq-client-go/rpc/broker.go +++ b/tubemq-client-twins/tubemq-client-go/rpc/broker.go @@ -18,6 +18,8 @@ package rpc import ( + "context" + "github.com/golang/protobuf/proto" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client" @@ -84,7 +86,9 @@ func (c *rpcClient) RegisterRequestC2B(metadata *metadata.Metadata, sub *client. Request: data, Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), } - rsp, err := c.client.DoRequest(c.ctx, req) + ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) + defer cancel() + rsp, err := c.client.DoRequest(ctx, req) if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { if v.ResponseException != nil { return nil, errs.New(errs.RetResponseException, v.ResponseException.String()) @@ -127,7 +131,9 @@ func (c *rpcClient) UnregisterRequestC2B(metadata metadata.Metadata, sub *client Request: data, Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), } - rsp, err := c.client.DoRequest(c.ctx, req) + ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) + defer cancel() + rsp, err := c.client.DoRequest(ctx, req) if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { if v.ResponseException != nil { return nil, errs.New(errs.RetResponseException, v.ResponseException.String()) @@ -170,7 +176,9 @@ func (c *rpcClient) GetMessageRequestC2B(metadata *metadata.Metadata, sub *clien Request: data, Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), } - rsp, err := c.client.DoRequest(c.ctx, req) + ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) + defer cancel() + rsp, err := c.client.DoRequest(ctx, req) if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { if v.ResponseException != nil { return nil, errs.New(errs.RetResponseException, v.ResponseException.String()) @@ -211,7 +219,9 @@ func (c *rpcClient) CommitOffsetRequestC2B(metadata *metadata.Metadata, sub *cli Request: data, Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), } - rsp, err := c.client.DoRequest(c.ctx, req) + ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) + defer cancel() + rsp, err := c.client.DoRequest(ctx, req) if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { if v.ResponseException != nil { return nil, errs.New(errs.RetResponseException, v.ResponseException.String()) @@ -257,7 +267,9 @@ func (c *rpcClient) HeartbeatRequestC2B(metadata *metadata.Metadata, sub *client req.RpcHeader = &protocol.RpcConnHeader{ Flag: proto.Int32(0), } - rsp, err := c.client.DoRequest(c.ctx, req) + ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) + defer cancel() + rsp, err := c.client.DoRequest(ctx, req) if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { if v.ResponseException != nil { return nil, errs.New(errs.RetResponseException, v.ResponseException.String()) diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go b/tubemq-client-twins/tubemq-client-go/rpc/client.go index 79a446e..46961cd 100644 --- a/tubemq-client-twins/tubemq-client-go/rpc/client.go +++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go @@ -19,8 +19,6 @@ package rpc import ( - "context" - "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata" @@ -57,16 +55,14 @@ type RPCClient interface { type rpcClient struct { pool *multiplexing.Pool client *transport.Client - ctx context.Context config *config.Config } // New returns a default TubeMQ rpc Client -func New(pool *multiplexing.Pool, opts *transport.Options, ctx context.Context, config *config.Config) RPCClient { +func New(pool *multiplexing.Pool, opts *transport.Options, config *config.Config) RPCClient { return &rpcClient{ pool: pool, client: transport.New(opts, pool), - ctx: ctx, config: config, } } diff --git a/tubemq-client-twins/tubemq-client-go/rpc/master.go b/tubemq-client-twins/tubemq-client-go/rpc/master.go index 4f9e7f9..38e2b68 100644 --- a/tubemq-client-twins/tubemq-client-go/rpc/master.go +++ b/tubemq-client-twins/tubemq-client-go/rpc/master.go @@ -18,6 +18,8 @@ package rpc import ( + "context" + "github.com/golang/protobuf/proto" "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client" @@ -89,7 +91,10 @@ func (c *rpcClient) RegisterRequestC2M(metadata *metadata.Metadata, sub *client. Request: data, Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()), } - rsp, err := c.client.DoRequest(c.ctx, req) + + ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) + defer cancel() + rsp, err := c.client.DoRequest(ctx, req) if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { if v.ResponseException != nil { return nil, errs.New(errs.RetResponseException, err.Error()) @@ -157,7 +162,9 @@ func (c *rpcClient) HeartRequestC2M(metadata *metadata.Metadata, sub *client.Sub req.RpcHeader = &protocol.RpcConnHeader{ Flag: proto.Int32(0), } - rsp, err := c.client.DoRequest(c.ctx, req) + ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) + defer cancel() + rsp, err := c.client.DoRequest(ctx, req) if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { if v.ResponseException != nil { return nil, errs.New(errs.RetResponseException, err.Error()) @@ -196,7 +203,9 @@ func (c *rpcClient) CloseRequestC2M(metadata *metadata.Metadata, sub *client.Sub req.RpcHeader = &protocol.RpcConnHeader{ Flag: proto.Int32(0), } - rsp, err := c.client.DoRequest(c.ctx, req) + ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) + defer cancel() + rsp, err := c.client.DoRequest(ctx, req) if v, ok := rsp.(*codec.TubeMQRPCResponse); ok { if v.ResponseException != nil { return nil, errs.New(errs.RetResponseException, err.Error())
