This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 501b6be59173072d4cfe27c1d97bc169af9feef9
Author: Zijie Lu <[email protected]>
AuthorDate: Mon May 24 14:31:02 2021 +0800

    Add license and construct context in each rpc request.
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 .../tubemq-client-go/config/config.go              | 20 ++++++++++++++++++++
 tubemq-client-twins/tubemq-client-go/rpc/broker.go | 22 +++++++++++++++++-----
 tubemq-client-twins/tubemq-client-go/rpc/client.go |  6 +-----
 tubemq-client-twins/tubemq-client-go/rpc/master.go | 15 ++++++++++++---
 4 files changed, 50 insertions(+), 13 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go 
b/tubemq-client-twins/tubemq-client-go/config/config.go
index 999d87a..f290f07 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config.go
@@ -1,10 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// package config defines the all the TubeMQ configuration options.
 package config
 
 import (
        "time"
 )
 
+// Config defines multiple configuration options.
 type Config struct {
+       // Net iis the namespace for network-level properties used by Broker 
and Master.
        Net struct {
                // How long to wait for a response.
                ReadTimeout time.Duration
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/broker.go 
b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
index aa7b831..b2b614d 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/broker.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
@@ -18,6 +18,8 @@
 package rpc
 
 import (
+       "context"
+
        "github.com/golang/protobuf/proto"
 
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
@@ -84,7 +86,9 @@ func (c *rpcClient) RegisterRequestC2B(metadata 
*metadata.Metadata, sub *client.
                Request: data,
                Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
        }
-       rsp, err := c.client.DoRequest(c.ctx, req)
+       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
+       defer cancel()
+       rsp, err := c.client.DoRequest(ctx, req)
        if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
                if v.ResponseException != nil {
                        return nil, errs.New(errs.RetResponseException, 
v.ResponseException.String())
@@ -127,7 +131,9 @@ func (c *rpcClient) UnregisterRequestC2B(metadata 
metadata.Metadata, sub *client
                Request: data,
                Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
        }
-       rsp, err := c.client.DoRequest(c.ctx, req)
+       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
+       defer cancel()
+       rsp, err := c.client.DoRequest(ctx, req)
        if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
                if v.ResponseException != nil {
                        return nil, errs.New(errs.RetResponseException, 
v.ResponseException.String())
@@ -170,7 +176,9 @@ func (c *rpcClient) GetMessageRequestC2B(metadata 
*metadata.Metadata, sub *clien
                Request: data,
                Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
        }
-       rsp, err := c.client.DoRequest(c.ctx, req)
+       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
+       defer cancel()
+       rsp, err := c.client.DoRequest(ctx, req)
        if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
                if v.ResponseException != nil {
                        return nil, errs.New(errs.RetResponseException, 
v.ResponseException.String())
@@ -211,7 +219,9 @@ func (c *rpcClient) CommitOffsetRequestC2B(metadata 
*metadata.Metadata, sub *cli
                Request: data,
                Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
        }
-       rsp, err := c.client.DoRequest(c.ctx, req)
+       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
+       defer cancel()
+       rsp, err := c.client.DoRequest(ctx, req)
        if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
                if v.ResponseException != nil {
                        return nil, errs.New(errs.RetResponseException, 
v.ResponseException.String())
@@ -257,7 +267,9 @@ func (c *rpcClient) HeartbeatRequestC2B(metadata 
*metadata.Metadata, sub *client
        req.RpcHeader = &protocol.RpcConnHeader{
                Flag: proto.Int32(0),
        }
-       rsp, err := c.client.DoRequest(c.ctx, req)
+       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
+       defer cancel()
+       rsp, err := c.client.DoRequest(ctx, req)
        if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
                if v.ResponseException != nil {
                        return nil, errs.New(errs.RetResponseException, 
v.ResponseException.String())
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go 
b/tubemq-client-twins/tubemq-client-go/rpc/client.go
index 79a446e..46961cd 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/client.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -19,8 +19,6 @@
 package rpc
 
 import (
-       "context"
-
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
@@ -57,16 +55,14 @@ type RPCClient interface {
 type rpcClient struct {
        pool   *multiplexing.Pool
        client *transport.Client
-       ctx    context.Context
        config *config.Config
 }
 
 // New returns a default TubeMQ rpc Client
-func New(pool *multiplexing.Pool, opts *transport.Options, ctx 
context.Context, config *config.Config) RPCClient {
+func New(pool *multiplexing.Pool, opts *transport.Options, config 
*config.Config) RPCClient {
        return &rpcClient{
                pool:   pool,
                client: transport.New(opts, pool),
-               ctx:    ctx,
                config: config,
        }
 }
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/master.go 
b/tubemq-client-twins/tubemq-client-go/rpc/master.go
index 4f9e7f9..38e2b68 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/master.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/master.go
@@ -18,6 +18,8 @@
 package rpc
 
 import (
+       "context"
+
        "github.com/golang/protobuf/proto"
 
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
@@ -89,7 +91,10 @@ func (c *rpcClient) RegisterRequestC2M(metadata 
*metadata.Metadata, sub *client.
                Request: data,
                Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
        }
-       rsp, err := c.client.DoRequest(c.ctx, req)
+
+       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
+       defer cancel()
+       rsp, err := c.client.DoRequest(ctx, req)
        if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
                if v.ResponseException != nil {
                        return nil, errs.New(errs.RetResponseException, 
err.Error())
@@ -157,7 +162,9 @@ func (c *rpcClient) HeartRequestC2M(metadata 
*metadata.Metadata, sub *client.Sub
        req.RpcHeader = &protocol.RpcConnHeader{
                Flag: proto.Int32(0),
        }
-       rsp, err := c.client.DoRequest(c.ctx, req)
+       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
+       defer cancel()
+       rsp, err := c.client.DoRequest(ctx, req)
        if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
                if v.ResponseException != nil {
                        return nil, errs.New(errs.RetResponseException, 
err.Error())
@@ -196,7 +203,9 @@ func (c *rpcClient) CloseRequestC2M(metadata 
*metadata.Metadata, sub *client.Sub
        req.RpcHeader = &protocol.RpcConnHeader{
                Flag: proto.Int32(0),
        }
-       rsp, err := c.client.DoRequest(c.ctx, req)
+       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
+       defer cancel()
+       rsp, err := c.client.DoRequest(ctx, req)
        if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
                if v.ResponseException != nil {
                        return nil, errs.New(errs.RetResponseException, 
err.Error())

Reply via email to