This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 459405a5dfd214f448b05bb796baccfd76081891
Author: Zijie Lu <[email protected]>
AuthorDate: Sun May 2 15:00:59 2021 +0800

    nitpick for codec and dialopts
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 tubemq-client-twins/tubemq-client-go/codec/codec.go    | 11 ++++-------
 .../tubemq-client-go/multiplexing/multiplexing.go      | 18 +++++++-----------
 .../tubemq-client-go/multiplexing/multlplexing_test.go | 12 ++++++++++--
 3 files changed, 21 insertions(+), 20 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go 
b/tubemq-client-twins/tubemq-client-go/codec/codec.go
index b9ee1d7..fb5c945 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go
@@ -30,7 +30,7 @@ import (
 const (
        RPCProtocolBeginToken uint32 = 0xFF7FF4FE
        RPCMaxBufferSize      uint32 = 8192
-       frameHeadLen          uint32 = 8
+       frameHeadLen          uint32 = 12
        maxBufferSize         int    = 128 * 1024
        defaultMsgSize        int    = 4096
        dataLen               uint32 = 4
@@ -82,11 +82,8 @@ func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
        if token != RPCProtocolBeginToken {
                return nil, errors.New("framer: read framer rpc protocol begin 
token not match")
        }
-       num, err = io.ReadFull(t.reader, 
t.msg[frameHeadLen:frameHeadLen+listSizeLen])
-       if num != int(listSizeLen) {
-               return nil, errors.New("framer: read invalid list size num")
-       }
-       listSize := binary.BigEndian.Uint32(t.msg[frameHeadLen : 
frameHeadLen+listSizeLen])
+       serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : 
beginTokenLen+serialNoLen])
+       listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : 
beginTokenLen+serialNoLen+listSizeLen])
        totalLen := int(frameHeadLen)
        size := make([]byte, 4)
        for i := 0; i < int(listSize); i++ {
@@ -119,7 +116,7 @@ func (t *TubeMQDecoder) Decode() (TransportResponse, error) 
{
        copy(data, t.msg[frameHeadLen:totalLen])
 
        return TubeMQResponse{
-               serialNo:    binary.BigEndian.Uint32(t.msg[beginTokenLen : 
beginTokenLen+serialNoLen]),
+               serialNo:    serialNo,
                responseBuf: data,
        }, nil
 }
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go 
b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index d1ee603..783c8e8 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -71,7 +71,7 @@ func NewPool() *Pool {
 // Get will return a multiplex connection
 // 1. If no underlying TCP connection has been created, a TCP connection will 
be created first.
 // 2. A new multiplex connection with the serialNo will be created and 
returned.
-func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) 
(*MultiplexConnection, error) {
+func (p *Pool) Get(ctx context.Context, address string, serialNo uint32, opts 
*DialOptions) (*MultiplexConnection, error) {
        select {
        case <-ctx.Done():
                return nil, ctx.Err()
@@ -98,7 +98,7 @@ func (p *Pool) Get(ctx context.Context, address string, 
serialNo uint32) (*Multi
        }
        p.connections.Store(address, c)
 
-       conn, dialOpts, err := dial(ctx, address)
+       conn, dialOpts, err := dial(ctx, address, opts)
        c.dialOpts = dialOpts
        if err != nil {
                return nil, err
@@ -112,24 +112,20 @@ func (p *Pool) Get(ctx context.Context, address string, 
serialNo uint32) (*Multi
        return c.new(ctx, serialNo)
 }
 
-func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) 
{
+func dial(ctx context.Context, address string, opts *DialOptions) (net.Conn, 
*DialOptions, error) {
        var timeout time.Duration
        t, ok := ctx.Deadline()
        if ok {
                timeout = t.Sub(time.Now())
        }
-       dialOpts := &DialOptions{
-               Network: "tcp",
-               Address: address,
-               Timeout: timeout,
-       }
+       opts.Timeout = timeout
        select {
        case <-ctx.Done():
-               return nil, dialOpts, ctx.Err()
+               return nil, opts, ctx.Err()
        default:
        }
-       conn, err := dialWithTimeout(dialOpts)
-       return conn, dialOpts, err
+       conn, err := dialWithTimeout(opts)
+       return conn, opts, err
 }
 
 func dialWithTimeout(opts *DialOptions) (net.Conn, error) {
diff --git 
a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go 
b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
index af1d416..0c6d6b9 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
@@ -93,7 +93,11 @@ func TestBasicMultiplexing(t *testing.T) {
        defer cancel()
 
        m := NewPool()
-       mc, err := m.Get(ctx, address, serialNo)
+       opts := &DialOptions{
+               Network: "tcp",
+               Address: address,
+       }
+       mc, err := m.Get(ctx, address, serialNo, opts)
        body := []byte("hello world")
 
        buf, err := Encode(serialNo, body)
@@ -118,7 +122,11 @@ func TestConcurrentMultiplexing(t *testing.T) {
                        ctx, cancel := 
context.WithTimeout(context.Background(), time.Second)
                        defer cancel()
                        serialNo := atomic.AddUint32(&serialNo, 1)
-                       mc, err := m.Get(ctx, address, serialNo)
+                       opts := &DialOptions{
+                               Network: "tcp",
+                               Address: address,
+                       }
+                       mc, err := m.Get(ctx, address, serialNo, opts)
                        assert.Nil(t, err)
 
                        body := []byte("hello world" + strconv.Itoa(i))

Reply via email to