This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch INLONG-25 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 459405a5dfd214f448b05bb796baccfd76081891 Author: Zijie Lu <[email protected]> AuthorDate: Sun May 2 15:00:59 2021 +0800 nitpick for codec and dialopts Signed-off-by: Zijie Lu <[email protected]> --- tubemq-client-twins/tubemq-client-go/codec/codec.go | 11 ++++------- .../tubemq-client-go/multiplexing/multiplexing.go | 18 +++++++----------- .../tubemq-client-go/multiplexing/multlplexing_test.go | 12 ++++++++++-- 3 files changed, 21 insertions(+), 20 deletions(-) diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go b/tubemq-client-twins/tubemq-client-go/codec/codec.go index b9ee1d7..fb5c945 100644 --- a/tubemq-client-twins/tubemq-client-go/codec/codec.go +++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go @@ -30,7 +30,7 @@ import ( const ( RPCProtocolBeginToken uint32 = 0xFF7FF4FE RPCMaxBufferSize uint32 = 8192 - frameHeadLen uint32 = 8 + frameHeadLen uint32 = 12 maxBufferSize int = 128 * 1024 defaultMsgSize int = 4096 dataLen uint32 = 4 @@ -82,11 +82,8 @@ func (t *TubeMQDecoder) Decode() (TransportResponse, error) { if token != RPCProtocolBeginToken { return nil, errors.New("framer: read framer rpc protocol begin token not match") } - num, err = io.ReadFull(t.reader, t.msg[frameHeadLen:frameHeadLen+listSizeLen]) - if num != int(listSizeLen) { - return nil, errors.New("framer: read invalid list size num") - } - listSize := binary.BigEndian.Uint32(t.msg[frameHeadLen : frameHeadLen+listSizeLen]) + serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen]) + listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : beginTokenLen+serialNoLen+listSizeLen]) totalLen := int(frameHeadLen) size := make([]byte, 4) for i := 0; i < int(listSize); i++ { @@ -119,7 +116,7 @@ func (t *TubeMQDecoder) Decode() (TransportResponse, error) { copy(data, t.msg[frameHeadLen:totalLen]) return TubeMQResponse{ - serialNo: binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen]), + serialNo: serialNo, responseBuf: data, }, nil } diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go index d1ee603..783c8e8 100644 --- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go +++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go @@ -71,7 +71,7 @@ func NewPool() *Pool { // Get will return a multiplex connection // 1. If no underlying TCP connection has been created, a TCP connection will be created first. // 2. A new multiplex connection with the serialNo will be created and returned. -func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*MultiplexConnection, error) { +func (p *Pool) Get(ctx context.Context, address string, serialNo uint32, opts *DialOptions) (*MultiplexConnection, error) { select { case <-ctx.Done(): return nil, ctx.Err() @@ -98,7 +98,7 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*Multi } p.connections.Store(address, c) - conn, dialOpts, err := dial(ctx, address) + conn, dialOpts, err := dial(ctx, address, opts) c.dialOpts = dialOpts if err != nil { return nil, err @@ -112,24 +112,20 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*Multi return c.new(ctx, serialNo) } -func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) { +func dial(ctx context.Context, address string, opts *DialOptions) (net.Conn, *DialOptions, error) { var timeout time.Duration t, ok := ctx.Deadline() if ok { timeout = t.Sub(time.Now()) } - dialOpts := &DialOptions{ - Network: "tcp", - Address: address, - Timeout: timeout, - } + opts.Timeout = timeout select { case <-ctx.Done(): - return nil, dialOpts, ctx.Err() + return nil, opts, ctx.Err() default: } - conn, err := dialWithTimeout(dialOpts) - return conn, dialOpts, err + conn, err := dialWithTimeout(opts) + return conn, opts, err } func dialWithTimeout(opts *DialOptions) (net.Conn, error) { diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go index af1d416..0c6d6b9 100644 --- a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go +++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go @@ -93,7 +93,11 @@ func TestBasicMultiplexing(t *testing.T) { defer cancel() m := NewPool() - mc, err := m.Get(ctx, address, serialNo) + opts := &DialOptions{ + Network: "tcp", + Address: address, + } + mc, err := m.Get(ctx, address, serialNo, opts) body := []byte("hello world") buf, err := Encode(serialNo, body) @@ -118,7 +122,11 @@ func TestConcurrentMultiplexing(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() serialNo := atomic.AddUint32(&serialNo, 1) - mc, err := m.Get(ctx, address, serialNo) + opts := &DialOptions{ + Network: "tcp", + Address: address, + } + mc, err := m.Get(ctx, address, serialNo, opts) assert.Nil(t, err) body := []byte("hello world" + strconv.Itoa(i))
