This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch INLONG-25 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit c6d3f751be782e5f62eb1f46b64366f169be8e38 Author: Zijie Lu <[email protected]> AuthorDate: Sat May 1 10:04:27 2021 +0800 Rename and add some comments Signed-off-by: Zijie Lu <[email protected]> --- .../tubemq-client-go/codec/codec.go | 16 ++++ tubemq-client-twins/tubemq-client-go/go.sum | 11 +++ .../multiplexing.go} | 98 +++++++++++++--------- .../multlplexing_test.go} | 6 +- 4 files changed, 88 insertions(+), 43 deletions(-) diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go b/tubemq-client-twins/tubemq-client-go/codec/codec.go index a6f3fee..b9ee1d7 100644 --- a/tubemq-client-twins/tubemq-client-go/codec/codec.go +++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go @@ -15,6 +15,9 @@ * limitations under the License. */ +// Package codec defines the encoding and decoding logic between TubeMQ. +// If the protocol of encoding and decoding is changed, only this package +// will need to be changed. package codec import ( @@ -36,20 +39,27 @@ const ( beginTokenLen uint32 = 4 ) +// TransportResponse is the abstraction of the transport response. type TransportResponse interface { + // GetSerialNo returns the `serialNo` of the corresponding request. GetSerialNo() uint32 + // GetResponseBuf returns the body of the response. GetResponseBuf() []byte } +// Decoder is the abstraction of the decoder which is used to decode the response. type Decoder interface { + // Decode will decode the response to frame head and body. Decode() (TransportResponse, error) } +// TubeMQDecoder is the implementation of the decoder of response from TubeMQ. type TubeMQDecoder struct { reader io.Reader msg []byte } +// New will return a default TubeMQDecoder. func New(reader io.Reader) *TubeMQDecoder { bufferReader := bufio.NewReaderSize(reader, maxBufferSize) return &TubeMQDecoder{ @@ -58,6 +68,8 @@ func New(reader io.Reader) *TubeMQDecoder { } } +// Decode will decode the response from TubeMQ to TransportResponse according to +// the RPC protocol of TubeMQ. func (t *TubeMQDecoder) Decode() (TransportResponse, error) { num, err := io.ReadFull(t.reader, t.msg[:frameHeadLen]) if err != nil { @@ -112,20 +124,24 @@ func (t *TubeMQDecoder) Decode() (TransportResponse, error) { }, nil } +// TubeMQRequest is the implementation of TubeMQ request. type TubeMQRequest struct { serialNo uint32 req []byte } +// TubeMQResponse is the TubeMQ implementation of TransportResponse. type TubeMQResponse struct { serialNo uint32 responseBuf []byte } +// GetSerialNo will return the SerialNo of TubeMQResponse. func (t TubeMQResponse) GetSerialNo() uint32 { return t.serialNo } +// GetResponseBuf will return the body of TubeMQResponse. func (t TubeMQResponse) GetResponseBuf() []byte { return t.responseBuf } diff --git a/tubemq-client-twins/tubemq-client-go/go.sum b/tubemq-client-twins/tubemq-client-go/go.sum new file mode 100644 index 0000000..acb88a4 --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/go.sum @@ -0,0 +1,11 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go similarity index 75% rename from tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go rename to tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go index 3792035..06e09d2 100644 --- a/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go +++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go @@ -15,11 +15,11 @@ * limitations under the License. */ -// Package multiplexed defines the multiplexed connection pool for sending -// request and receiving response. After receiving the response, it will -// be decoded and returned to the client. It is used for the underlying communication +// Package multiplexing defines the multiplex connection pool for sending +// request and receiving response. After receiving the response, the decoded +// response will be returned to the client. It is used for the communication // with TubeMQ. -package multiplexed +package multiplexing import ( "context" @@ -35,16 +35,17 @@ import ( ) var ( - // ErrConnClosed indicates that the connection is closed - ErrConnClosed = errors.New("connection is closed") - // ErrChanClose indicates the recv chan is closed - ErrChanClose = errors.New("unexpected recv chan close") + // ErrConnClosed indicates the connection has been closed + ErrConnClosed = errors.New("connection has been closed") + // ErrChanClosed indicates the recv chan has been closed + ErrChanClosed = errors.New("unexpected recv chan closing") // ErrWriteBufferDone indicates write buffer done ErrWriteBufferDone = errors.New("write buffer done") - // ErrAssertConnectionFail indicates connection assertion error - ErrAssertConnectionFail = errors.New("assert connection slice fail") + // ErrAssertConnection indicates connection assertion error + ErrAssertConnection = errors.New("connection assertion error") ) +// The state of the connection. const ( Initial int = iota Connected @@ -54,10 +55,12 @@ const ( const queueSize = 10000 +// Pool maintains the multiplex connections of different addresses. type Pool struct { connections *sync.Map } +// NewPool will construct a default multiplex connections pool. func NewPool() *Pool { m := &Pool{ connections: new(sync.Map), @@ -65,7 +68,10 @@ func NewPool() *Pool { return m } -func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*MultiplexedConnection, error) { +// Get will return a multiplex connection +// 1. If no underlying TCP connection has been created, a TCP connection will be created first. +// 2. A new multiplex connection with the serialNo will be created and returned. +func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*MultiplexConnection, error) { select { case <-ctx.Done(): return nil, ctx.Err() @@ -76,12 +82,12 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*Multi if c, ok := v.(*Connection); ok { return c.new(ctx, serialNo) } - return nil, ErrAssertConnectionFail + return nil, ErrAssertConnection } c := &Connection{ address: address, - connections: make(map[uint32]*MultiplexedConnection), + connections: make(map[uint32]*MultiplexConnection), done: make(chan struct{}), mDone: make(chan struct{}), state: Initial, @@ -100,38 +106,28 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*Multi c.decoder = codec.New(conn) c.conn = conn c.state = Connected + c.pool = p go c.reader() go c.writer() return c.new(ctx, serialNo) } -type writerBuffer struct { - buffer chan []byte - done <-chan struct{} -} - -func (w *writerBuffer) get() ([]byte, error) { - select { - case req := <-w.buffer: - return req, nil - case <-w.done: - return nil, ErrWriteBufferDone - } -} - type recvReader struct { ctx context.Context recv chan codec.TransportResponse } -type MultiplexedConnection struct { +// MultiplexConnection is used to multiplex a TCP connection. +// It is distinguished by the serialNo. +type MultiplexConnection struct { serialNo uint32 conn *Connection reader *recvReader done chan struct{} } -func (mc *MultiplexedConnection) Write(b []byte) error { +// Write uses the underlying TCP connection to send the bytes. +func (mc *MultiplexConnection) Write(b []byte) error { if err := mc.conn.send(b); err != nil { mc.conn.remove(mc.serialNo) return err @@ -139,7 +135,8 @@ func (mc *MultiplexedConnection) Write(b []byte) error { return nil } -func (mc *MultiplexedConnection) Read() (codec.TransportResponse, error) { +// Read returns the response from the multiplex connection. +func (mc *MultiplexConnection) Read() (codec.TransportResponse, error) { select { case <-mc.reader.ctx.Done(): mc.conn.remove(mc.serialNo) @@ -151,17 +148,19 @@ func (mc *MultiplexedConnection) Read() (codec.TransportResponse, error) { if mc.conn.err != nil { return nil, mc.conn.err } - return nil, ErrChanClose + return nil, ErrChanClosed case <-mc.done: return nil, mc.conn.err } } -func (mc *MultiplexedConnection) recv(rsp *codec.TubeMQResponse) { +func (mc *MultiplexConnection) recv(rsp *codec.TubeMQResponse) { mc.reader.recv <- rsp mc.conn.remove(rsp.GetSerialNo()) } +// DialOptions represents the dail options of the TCP connection. +// If TLS is not enabled, the configuration of TLS can be ignored. type DialOptions struct { Network string Address string @@ -172,11 +171,13 @@ type DialOptions struct { TLSServerName string } +// Connection represents the underlying TCP connection of the multiplex connections of an address +// and maintains the multiplex connections. type Connection struct { err error address string mu sync.RWMutex - connections map[uint32]*MultiplexedConnection + connections map[uint32]*MultiplexConnection decoder codec.Decoder conn net.Conn done chan struct{} @@ -187,14 +188,14 @@ type Connection struct { pool *Pool } -func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexedConnection, error) { +func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexConnection, error) { c.mu.Lock() defer c.mu.Unlock() if c.err != nil { return nil, c.err } - vc := &MultiplexedConnection{ + mc := &MultiplexConnection{ serialNo: serialNo, conn: c, done: c.mDone, @@ -204,11 +205,11 @@ func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexedConn }, } - if prevConn, ok := c.connections[serialNo]; ok { - close(prevConn.reader.recv) + if lastConn, ok := c.connections[serialNo]; ok { + close(lastConn.reader.recv) } - c.connections[serialNo] = vc - return vc, nil + c.connections[serialNo] = mc + return mc, nil } func (c *Connection) close(lastErr error, done chan struct{}) { @@ -230,7 +231,7 @@ func (c *Connection) close(lastErr error, done chan struct{}) { c.state = Closing c.err = lastErr - c.connections = make(map[uint32]*MultiplexedConnection) + c.connections = make(map[uint32]*MultiplexConnection) close(c.done) if c.conn != nil { c.conn.Close() @@ -311,6 +312,9 @@ func (c *Connection) write(b []byte) error { return nil } +// The response handling logic of the TCP connection. +// 1. Read from the connection and decode it to the TransportResponse. +// 2. Send the response to the corresponding multiplex connection based on the serialNo. func (c *Connection) reader() { var lastErr error for { @@ -337,6 +341,20 @@ func (c *Connection) reader() { c.close(lastErr, c.done) } +type writerBuffer struct { + buffer chan []byte + done <-chan struct{} +} + +func (w *writerBuffer) get() ([]byte, error) { + select { + case req := <-w.buffer: + return req, nil + case <-w.done: + return nil, ErrWriteBufferDone + } +} + func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) { var timeout time.Duration t, ok := ctx.Deadline() diff --git a/tubemq-client-twins/tubemq-client-go/multiplexed/multlplexed_test.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go similarity index 96% rename from tubemq-client-twins/tubemq-client-go/multiplexed/multlplexed_test.go rename to tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go index 4584607..af1d416 100644 --- a/tubemq-client-twins/tubemq-client-go/multiplexed/multlplexed_test.go +++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package multiplexed +package multiplexing import ( "bytes" @@ -87,7 +87,7 @@ func Encode(serialNo uint32, body []byte) ([]byte, error) { return buf.Bytes(), nil } -func TestBasicMultiplexed(t *testing.T) { +func TestBasicMultiplexing(t *testing.T) { serialNo := atomic.AddUint32(&serialNo, 1) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -107,7 +107,7 @@ func TestBasicMultiplexed(t *testing.T) { assert.Equal(t, mc.Write(nil), nil) } -func TestConcurrentMultiplexed(t *testing.T) { +func TestConcurrentMultiplexing(t *testing.T) { count := 1000 m := NewPool() wg := sync.WaitGroup{}
