This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch INLONG-25 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 6b64b52905cba23c4f6b7fe892f69f81b7cc6032 Author: Zijie Lu <[email protected]> AuthorDate: Thu May 6 10:44:51 2021 +0800 Address review comments Signed-off-by: Zijie Lu <[email protected]> --- .../tubemq-client-go/codec/codec.go | 119 +-------------------- .../tubemq-client-go/multiplexing/multiplexing.go | 16 +-- .../multiplexing/multlplexing_test.go | 4 +- 3 files changed, 15 insertions(+), 124 deletions(-) diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go b/tubemq-client-twins/tubemq-client-go/codec/codec.go index fb5c945..4af8b5d 100644 --- a/tubemq-client-twins/tubemq-client-go/codec/codec.go +++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go @@ -20,125 +20,16 @@ // will need to be changed. package codec -import ( - "bufio" - "encoding/binary" - "errors" - "io" -) - -const ( - RPCProtocolBeginToken uint32 = 0xFF7FF4FE - RPCMaxBufferSize uint32 = 8192 - frameHeadLen uint32 = 12 - maxBufferSize int = 128 * 1024 - defaultMsgSize int = 4096 - dataLen uint32 = 4 - listSizeLen uint32 = 4 - serialNoLen uint32 = 4 - beginTokenLen uint32 = 4 -) - -// TransportResponse is the abstraction of the transport response. -type TransportResponse interface { +// Response is the abstraction of the transport response. +type Response interface { // GetSerialNo returns the `serialNo` of the corresponding request. GetSerialNo() uint32 - // GetResponseBuf returns the body of the response. - GetResponseBuf() []byte + // GetBuffer returns the body of the response. + GetBuffer() []byte } // Decoder is the abstraction of the decoder which is used to decode the response. type Decoder interface { // Decode will decode the response to frame head and body. - Decode() (TransportResponse, error) -} - -// TubeMQDecoder is the implementation of the decoder of response from TubeMQ. -type TubeMQDecoder struct { - reader io.Reader - msg []byte -} - -// New will return a default TubeMQDecoder. -func New(reader io.Reader) *TubeMQDecoder { - bufferReader := bufio.NewReaderSize(reader, maxBufferSize) - return &TubeMQDecoder{ - msg: make([]byte, defaultMsgSize), - reader: bufferReader, - } -} - -// Decode will decode the response from TubeMQ to TransportResponse according to -// the RPC protocol of TubeMQ. -func (t *TubeMQDecoder) Decode() (TransportResponse, error) { - num, err := io.ReadFull(t.reader, t.msg[:frameHeadLen]) - if err != nil { - return nil, err - } - if num != int(frameHeadLen) { - return nil, errors.New("framer: read frame header num invalid") - } - token := binary.BigEndian.Uint32(t.msg[:beginTokenLen]) - if token != RPCProtocolBeginToken { - return nil, errors.New("framer: read framer rpc protocol begin token not match") - } - serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen]) - listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : beginTokenLen+serialNoLen+listSizeLen]) - totalLen := int(frameHeadLen) - size := make([]byte, 4) - for i := 0; i < int(listSize); i++ { - n, err := io.ReadFull(t.reader, size) - if err != nil { - return nil, err - } - if n != int(dataLen) { - return nil, errors.New("framer: read invalid size") - } - - s := int(binary.BigEndian.Uint32(size)) - if totalLen+s > len(t.msg) { - data := t.msg[:totalLen] - t.msg = make([]byte, totalLen+s) - copy(t.msg, data[:]) - } - - num, err = io.ReadFull(t.reader, t.msg[totalLen:totalLen+s]) - if err != nil { - return nil, err - } - if num != s { - return nil, errors.New("framer: read invalid data") - } - totalLen += s - } - - data := make([]byte, totalLen-int(frameHeadLen)) - copy(data, t.msg[frameHeadLen:totalLen]) - - return TubeMQResponse{ - serialNo: serialNo, - responseBuf: data, - }, nil -} - -// TubeMQRequest is the implementation of TubeMQ request. -type TubeMQRequest struct { - serialNo uint32 - req []byte -} - -// TubeMQResponse is the TubeMQ implementation of TransportResponse. -type TubeMQResponse struct { - serialNo uint32 - responseBuf []byte -} - -// GetSerialNo will return the SerialNo of TubeMQResponse. -func (t TubeMQResponse) GetSerialNo() uint32 { - return t.serialNo -} - -// GetResponseBuf will return the body of TubeMQResponse. -func (t TubeMQResponse) GetResponseBuf() []byte { - return t.responseBuf + Decode() (Response, error) } diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go index 783c8e8..825636a 100644 --- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go +++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go @@ -98,7 +98,7 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32, opts *D } p.connections.Store(address, c) - conn, dialOpts, err := dial(ctx, address, opts) + conn, dialOpts, err := dial(ctx, opts) c.dialOpts = dialOpts if err != nil { return nil, err @@ -112,7 +112,7 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32, opts *D return c.new(ctx, serialNo) } -func dial(ctx context.Context, address string, opts *DialOptions) (net.Conn, *DialOptions, error) { +func dial(ctx context.Context, opts *DialOptions) (net.Conn, *DialOptions, error) { var timeout time.Duration t, ok := ctx.Deadline() if ok { @@ -177,7 +177,7 @@ func getCertPool(caCertFile string) (*x509.CertPool, error) { type recvReader struct { ctx context.Context - recv chan codec.TransportResponse + recv chan codec.Response } // MultiplexConnection is used to multiplex a TCP connection. @@ -199,7 +199,7 @@ func (mc *MultiplexConnection) Write(b []byte) error { } // Read returns the response from the multiplex connection. -func (mc *MultiplexConnection) Read() (codec.TransportResponse, error) { +func (mc *MultiplexConnection) Read() (codec.Response, error) { select { case <-mc.reader.ctx.Done(): mc.conn.remove(mc.serialNo) @@ -217,7 +217,7 @@ func (mc *MultiplexConnection) Read() (codec.TransportResponse, error) { } } -func (mc *MultiplexConnection) recv(rsp *codec.TubeMQResponse) { +func (mc *MultiplexConnection) recv(rsp codec.Response) { mc.reader.recv <- rsp mc.conn.remove(rsp.GetSerialNo()) } @@ -264,7 +264,7 @@ func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexConnec done: c.mDone, reader: &recvReader{ ctx: ctx, - recv: make(chan codec.TransportResponse, 1), + recv: make(chan codec.Response, 1), }, } @@ -324,7 +324,7 @@ func (c *Connection) reconnect() error { } // The response handling logic of the TCP connection. -// 1. Read from the connection and decode it to the TransportResponse. +// 1. Read from the connection and decode it to the Response. // 2. Send the response to the corresponding multiplex connection based on the serialNo. func (c *Connection) reader() { var lastErr error @@ -347,7 +347,7 @@ func (c *Connection) reader() { continue } mc.reader.recv <- rsp - mc.conn.remove(rsp.GetSerialNo()) + mc.conn.remove(serialNo) } c.close(lastErr, c.done) } diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go index 0c6d6b9..fbeb8c4 100644 --- a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go +++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go @@ -107,7 +107,7 @@ func TestBasicMultiplexing(t *testing.T) { rsp, err := mc.Read() assert.Nil(t, err) assert.Equal(t, serialNo, rsp.GetSerialNo()) - assert.Equal(t, body, rsp.GetResponseBuf()) + assert.Equal(t, body, rsp.GetBuffer()) assert.Equal(t, mc.Write(nil), nil) } @@ -137,7 +137,7 @@ func TestConcurrentMultiplexing(t *testing.T) { rsp, err := mc.Read() assert.Nil(t, err) assert.Equal(t, serialNo, rsp.GetSerialNo()) - assert.Equal(t, body, rsp.GetResponseBuf()) + assert.Equal(t, body, rsp.GetBuffer()) }(i) } wg.Wait()
