This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 6b64b52905cba23c4f6b7fe892f69f81b7cc6032
Author: Zijie Lu <[email protected]>
AuthorDate: Thu May 6 10:44:51 2021 +0800

    Address review comments
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 .../tubemq-client-go/codec/codec.go                | 119 +--------------------
 .../tubemq-client-go/multiplexing/multiplexing.go  |  16 +--
 .../multiplexing/multlplexing_test.go              |   4 +-
 3 files changed, 15 insertions(+), 124 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go 
b/tubemq-client-twins/tubemq-client-go/codec/codec.go
index fb5c945..4af8b5d 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go
@@ -20,125 +20,16 @@
 // will need to be changed.
 package codec
 
-import (
-       "bufio"
-       "encoding/binary"
-       "errors"
-       "io"
-)
-
-const (
-       RPCProtocolBeginToken uint32 = 0xFF7FF4FE
-       RPCMaxBufferSize      uint32 = 8192
-       frameHeadLen          uint32 = 12
-       maxBufferSize         int    = 128 * 1024
-       defaultMsgSize        int    = 4096
-       dataLen               uint32 = 4
-       listSizeLen           uint32 = 4
-       serialNoLen           uint32 = 4
-       beginTokenLen         uint32 = 4
-)
-
-// TransportResponse is the abstraction of the transport response.
-type TransportResponse interface {
+// Response is the abstraction of the transport response.
+type Response interface {
        // GetSerialNo returns the `serialNo` of the corresponding request.
        GetSerialNo() uint32
-       // GetResponseBuf returns the body of the response.
-       GetResponseBuf() []byte
+       // GetBuffer returns the body of the response.
+       GetBuffer() []byte
 }
 
 // Decoder is the abstraction of the decoder which is used to decode the 
response.
 type Decoder interface {
        // Decode will decode the response to frame head and body.
-       Decode() (TransportResponse, error)
-}
-
-// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
-type TubeMQDecoder struct {
-       reader io.Reader
-       msg    []byte
-}
-
-// New will return a default TubeMQDecoder.
-func New(reader io.Reader) *TubeMQDecoder {
-       bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
-       return &TubeMQDecoder{
-               msg:    make([]byte, defaultMsgSize),
-               reader: bufferReader,
-       }
-}
-
-// Decode will decode the response from TubeMQ to TransportResponse according 
to
-// the RPC protocol of TubeMQ.
-func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
-       num, err := io.ReadFull(t.reader, t.msg[:frameHeadLen])
-       if err != nil {
-               return nil, err
-       }
-       if num != int(frameHeadLen) {
-               return nil, errors.New("framer: read frame header num invalid")
-       }
-       token := binary.BigEndian.Uint32(t.msg[:beginTokenLen])
-       if token != RPCProtocolBeginToken {
-               return nil, errors.New("framer: read framer rpc protocol begin 
token not match")
-       }
-       serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : 
beginTokenLen+serialNoLen])
-       listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : 
beginTokenLen+serialNoLen+listSizeLen])
-       totalLen := int(frameHeadLen)
-       size := make([]byte, 4)
-       for i := 0; i < int(listSize); i++ {
-               n, err := io.ReadFull(t.reader, size)
-               if err != nil {
-                       return nil, err
-               }
-               if n != int(dataLen) {
-                       return nil, errors.New("framer: read invalid size")
-               }
-
-               s := int(binary.BigEndian.Uint32(size))
-               if totalLen+s > len(t.msg) {
-                       data := t.msg[:totalLen]
-                       t.msg = make([]byte, totalLen+s)
-                       copy(t.msg, data[:])
-               }
-
-               num, err = io.ReadFull(t.reader, t.msg[totalLen:totalLen+s])
-               if err != nil {
-                       return nil, err
-               }
-               if num != s {
-                       return nil, errors.New("framer: read invalid data")
-               }
-               totalLen += s
-       }
-
-       data := make([]byte, totalLen-int(frameHeadLen))
-       copy(data, t.msg[frameHeadLen:totalLen])
-
-       return TubeMQResponse{
-               serialNo:    serialNo,
-               responseBuf: data,
-       }, nil
-}
-
-// TubeMQRequest is the implementation of TubeMQ request.
-type TubeMQRequest struct {
-       serialNo uint32
-       req      []byte
-}
-
-// TubeMQResponse is the TubeMQ implementation of TransportResponse.
-type TubeMQResponse struct {
-       serialNo    uint32
-       responseBuf []byte
-}
-
-// GetSerialNo will return the SerialNo of TubeMQResponse.
-func (t TubeMQResponse) GetSerialNo() uint32 {
-       return t.serialNo
-}
-
-// GetResponseBuf will return the body of TubeMQResponse.
-func (t TubeMQResponse) GetResponseBuf() []byte {
-       return t.responseBuf
+       Decode() (Response, error)
 }
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go 
b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index 783c8e8..825636a 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -98,7 +98,7 @@ func (p *Pool) Get(ctx context.Context, address string, 
serialNo uint32, opts *D
        }
        p.connections.Store(address, c)
 
-       conn, dialOpts, err := dial(ctx, address, opts)
+       conn, dialOpts, err := dial(ctx, opts)
        c.dialOpts = dialOpts
        if err != nil {
                return nil, err
@@ -112,7 +112,7 @@ func (p *Pool) Get(ctx context.Context, address string, 
serialNo uint32, opts *D
        return c.new(ctx, serialNo)
 }
 
-func dial(ctx context.Context, address string, opts *DialOptions) (net.Conn, 
*DialOptions, error) {
+func dial(ctx context.Context, opts *DialOptions) (net.Conn, *DialOptions, 
error) {
        var timeout time.Duration
        t, ok := ctx.Deadline()
        if ok {
@@ -177,7 +177,7 @@ func getCertPool(caCertFile string) (*x509.CertPool, error) 
{
 
 type recvReader struct {
        ctx  context.Context
-       recv chan codec.TransportResponse
+       recv chan codec.Response
 }
 
 // MultiplexConnection is used to multiplex a TCP connection.
@@ -199,7 +199,7 @@ func (mc *MultiplexConnection) Write(b []byte) error {
 }
 
 // Read returns the response from the multiplex connection.
-func (mc *MultiplexConnection) Read() (codec.TransportResponse, error) {
+func (mc *MultiplexConnection) Read() (codec.Response, error) {
        select {
        case <-mc.reader.ctx.Done():
                mc.conn.remove(mc.serialNo)
@@ -217,7 +217,7 @@ func (mc *MultiplexConnection) Read() 
(codec.TransportResponse, error) {
        }
 }
 
-func (mc *MultiplexConnection) recv(rsp *codec.TubeMQResponse) {
+func (mc *MultiplexConnection) recv(rsp codec.Response) {
        mc.reader.recv <- rsp
        mc.conn.remove(rsp.GetSerialNo())
 }
@@ -264,7 +264,7 @@ func (c *Connection) new(ctx context.Context, serialNo 
uint32) (*MultiplexConnec
                done:     c.mDone,
                reader: &recvReader{
                        ctx:  ctx,
-                       recv: make(chan codec.TransportResponse, 1),
+                       recv: make(chan codec.Response, 1),
                },
        }
 
@@ -324,7 +324,7 @@ func (c *Connection) reconnect() error {
 }
 
 // The response handling logic of the TCP connection.
-// 1. Read from the connection and decode it to the TransportResponse.
+// 1. Read from the connection and decode it to the Response.
 // 2. Send the response to the corresponding multiplex connection based on the 
serialNo.
 func (c *Connection) reader() {
        var lastErr error
@@ -347,7 +347,7 @@ func (c *Connection) reader() {
                        continue
                }
                mc.reader.recv <- rsp
-               mc.conn.remove(rsp.GetSerialNo())
+               mc.conn.remove(serialNo)
        }
        c.close(lastErr, c.done)
 }
diff --git 
a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go 
b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
index 0c6d6b9..fbeb8c4 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
@@ -107,7 +107,7 @@ func TestBasicMultiplexing(t *testing.T) {
        rsp, err := mc.Read()
        assert.Nil(t, err)
        assert.Equal(t, serialNo, rsp.GetSerialNo())
-       assert.Equal(t, body, rsp.GetResponseBuf())
+       assert.Equal(t, body, rsp.GetBuffer())
        assert.Equal(t, mc.Write(nil), nil)
 }
 
@@ -137,7 +137,7 @@ func TestConcurrentMultiplexing(t *testing.T) {
                        rsp, err := mc.Read()
                        assert.Nil(t, err)
                        assert.Equal(t, serialNo, rsp.GetSerialNo())
-                       assert.Equal(t, body, rsp.GetResponseBuf())
+                       assert.Equal(t, body, rsp.GetBuffer())
                }(i)
        }
        wg.Wait()

Reply via email to