This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit c6d3f751be782e5f62eb1f46b64366f169be8e38
Author: Zijie Lu <[email protected]>
AuthorDate: Sat May 1 10:04:27 2021 +0800

    Rename and add some comments
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 .../tubemq-client-go/codec/codec.go                | 16 ++++
 tubemq-client-twins/tubemq-client-go/go.sum        | 11 +++
 .../multiplexing.go}                               | 98 +++++++++++++---------
 .../multlplexing_test.go}                          |  6 +-
 4 files changed, 88 insertions(+), 43 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go 
b/tubemq-client-twins/tubemq-client-go/codec/codec.go
index a6f3fee..b9ee1d7 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go
@@ -15,6 +15,9 @@
  * limitations under the License.
  */
 
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
 package codec
 
 import (
@@ -36,20 +39,27 @@ const (
        beginTokenLen         uint32 = 4
 )
 
+// TransportResponse is the abstraction of the transport response.
 type TransportResponse interface {
+       // GetSerialNo returns the `serialNo` of the corresponding request.
        GetSerialNo() uint32
+       // GetResponseBuf returns the body of the response.
        GetResponseBuf() []byte
 }
 
+// Decoder is the abstraction of the decoder which is used to decode the 
response.
 type Decoder interface {
+       // Decode will decode the response to frame head and body.
        Decode() (TransportResponse, error)
 }
 
+// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
 type TubeMQDecoder struct {
        reader io.Reader
        msg    []byte
 }
 
+// New will return a default TubeMQDecoder.
 func New(reader io.Reader) *TubeMQDecoder {
        bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
        return &TubeMQDecoder{
@@ -58,6 +68,8 @@ func New(reader io.Reader) *TubeMQDecoder {
        }
 }
 
+// Decode will decode the response from TubeMQ to TransportResponse according 
to
+// the RPC protocol of TubeMQ.
 func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
        num, err := io.ReadFull(t.reader, t.msg[:frameHeadLen])
        if err != nil {
@@ -112,20 +124,24 @@ func (t *TubeMQDecoder) Decode() (TransportResponse, 
error) {
        }, nil
 }
 
+// TubeMQRequest is the implementation of TubeMQ request.
 type TubeMQRequest struct {
        serialNo uint32
        req      []byte
 }
 
+// TubeMQResponse is the TubeMQ implementation of TransportResponse.
 type TubeMQResponse struct {
        serialNo    uint32
        responseBuf []byte
 }
 
+// GetSerialNo will return the SerialNo of TubeMQResponse.
 func (t TubeMQResponse) GetSerialNo() uint32 {
        return t.serialNo
 }
 
+// GetResponseBuf will return the body of TubeMQResponse.
 func (t TubeMQResponse) GetResponseBuf() []byte {
        return t.responseBuf
 }
diff --git a/tubemq-client-twins/tubemq-client-go/go.sum 
b/tubemq-client-twins/tubemq-client-go/go.sum
new file mode 100644
index 0000000..acb88a4
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/go.sum
@@ -0,0 +1,11 @@
+github.com/davecgh/go-spew v1.1.0 
h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
+github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.7.0 
h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
+github.com/stretchr/testify v1.7.0/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 
h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c 
h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod 
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go 
b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
similarity index 75%
rename from tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go
rename to tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index 3792035..06e09d2 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -15,11 +15,11 @@
  * limitations under the License.
  */
 
-// Package multiplexed defines the multiplexed connection pool for sending
-// request and receiving response. After receiving the response, it will
-// be decoded and returned to the client. It is used for the underlying 
communication
+// Package multiplexing defines the multiplex connection pool for sending
+// request and receiving response. After receiving the response, the decoded
+// response will be returned to the client. It is used for the communication
 // with TubeMQ.
-package multiplexed
+package multiplexing
 
 import (
        "context"
@@ -35,16 +35,17 @@ import (
 )
 
 var (
-       // ErrConnClosed indicates that the connection is closed
-       ErrConnClosed = errors.New("connection is closed")
-       // ErrChanClose indicates the recv chan is closed
-       ErrChanClose = errors.New("unexpected recv chan close")
+       // ErrConnClosed indicates the connection has been closed
+       ErrConnClosed = errors.New("connection has been closed")
+       // ErrChanClosed indicates the recv chan has been closed
+       ErrChanClosed = errors.New("unexpected recv chan closing")
        // ErrWriteBufferDone indicates write buffer done
        ErrWriteBufferDone = errors.New("write buffer done")
-       // ErrAssertConnectionFail indicates connection assertion error
-       ErrAssertConnectionFail = errors.New("assert connection slice fail")
+       // ErrAssertConnection indicates connection assertion error
+       ErrAssertConnection = errors.New("connection assertion error")
 )
 
+// The state of the connection.
 const (
        Initial int = iota
        Connected
@@ -54,10 +55,12 @@ const (
 
 const queueSize = 10000
 
+// Pool maintains the multiplex connections of different addresses.
 type Pool struct {
        connections *sync.Map
 }
 
+// NewPool will construct a default multiplex connections pool.
 func NewPool() *Pool {
        m := &Pool{
                connections: new(sync.Map),
@@ -65,7 +68,10 @@ func NewPool() *Pool {
        return m
 }
 
-func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) 
(*MultiplexedConnection, error) {
+// Get will return a multiplex connection
+// 1. If no underlying TCP connection has been created, a TCP connection will 
be created first.
+// 2. A new multiplex connection with the serialNo will be created and 
returned.
+func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) 
(*MultiplexConnection, error) {
        select {
        case <-ctx.Done():
                return nil, ctx.Err()
@@ -76,12 +82,12 @@ func (p *Pool) Get(ctx context.Context, address string, 
serialNo uint32) (*Multi
                if c, ok := v.(*Connection); ok {
                        return c.new(ctx, serialNo)
                }
-               return nil, ErrAssertConnectionFail
+               return nil, ErrAssertConnection
        }
 
        c := &Connection{
                address:     address,
-               connections: make(map[uint32]*MultiplexedConnection),
+               connections: make(map[uint32]*MultiplexConnection),
                done:        make(chan struct{}),
                mDone:       make(chan struct{}),
                state:       Initial,
@@ -100,38 +106,28 @@ func (p *Pool) Get(ctx context.Context, address string, 
serialNo uint32) (*Multi
        c.decoder = codec.New(conn)
        c.conn = conn
        c.state = Connected
+       c.pool = p
        go c.reader()
        go c.writer()
        return c.new(ctx, serialNo)
 }
 
-type writerBuffer struct {
-       buffer chan []byte
-       done   <-chan struct{}
-}
-
-func (w *writerBuffer) get() ([]byte, error) {
-       select {
-       case req := <-w.buffer:
-               return req, nil
-       case <-w.done:
-               return nil, ErrWriteBufferDone
-       }
-}
-
 type recvReader struct {
        ctx  context.Context
        recv chan codec.TransportResponse
 }
 
-type MultiplexedConnection struct {
+// MultiplexConnection is used to multiplex a TCP connection.
+// It is distinguished by the serialNo.
+type MultiplexConnection struct {
        serialNo uint32
        conn     *Connection
        reader   *recvReader
        done     chan struct{}
 }
 
-func (mc *MultiplexedConnection) Write(b []byte) error {
+// Write uses the underlying TCP connection to send the bytes.
+func (mc *MultiplexConnection) Write(b []byte) error {
        if err := mc.conn.send(b); err != nil {
                mc.conn.remove(mc.serialNo)
                return err
@@ -139,7 +135,8 @@ func (mc *MultiplexedConnection) Write(b []byte) error {
        return nil
 }
 
-func (mc *MultiplexedConnection) Read() (codec.TransportResponse, error) {
+// Read returns the response from the multiplex connection.
+func (mc *MultiplexConnection) Read() (codec.TransportResponse, error) {
        select {
        case <-mc.reader.ctx.Done():
                mc.conn.remove(mc.serialNo)
@@ -151,17 +148,19 @@ func (mc *MultiplexedConnection) Read() 
(codec.TransportResponse, error) {
                if mc.conn.err != nil {
                        return nil, mc.conn.err
                }
-               return nil, ErrChanClose
+               return nil, ErrChanClosed
        case <-mc.done:
                return nil, mc.conn.err
        }
 }
 
-func (mc *MultiplexedConnection) recv(rsp *codec.TubeMQResponse) {
+func (mc *MultiplexConnection) recv(rsp *codec.TubeMQResponse) {
        mc.reader.recv <- rsp
        mc.conn.remove(rsp.GetSerialNo())
 }
 
+// DialOptions represents the dail options of the TCP connection.
+// If TLS is not enabled, the configuration of TLS can be ignored.
 type DialOptions struct {
        Network       string
        Address       string
@@ -172,11 +171,13 @@ type DialOptions struct {
        TLSServerName string
 }
 
+// Connection represents the underlying TCP connection of the multiplex 
connections of an address
+// and maintains the multiplex connections.
 type Connection struct {
        err         error
        address     string
        mu          sync.RWMutex
-       connections map[uint32]*MultiplexedConnection
+       connections map[uint32]*MultiplexConnection
        decoder     codec.Decoder
        conn        net.Conn
        done        chan struct{}
@@ -187,14 +188,14 @@ type Connection struct {
        pool        *Pool
 }
 
-func (c *Connection) new(ctx context.Context, serialNo uint32) 
(*MultiplexedConnection, error) {
+func (c *Connection) new(ctx context.Context, serialNo uint32) 
(*MultiplexConnection, error) {
        c.mu.Lock()
        defer c.mu.Unlock()
        if c.err != nil {
                return nil, c.err
        }
 
-       vc := &MultiplexedConnection{
+       mc := &MultiplexConnection{
                serialNo: serialNo,
                conn:     c,
                done:     c.mDone,
@@ -204,11 +205,11 @@ func (c *Connection) new(ctx context.Context, serialNo 
uint32) (*MultiplexedConn
                },
        }
 
-       if prevConn, ok := c.connections[serialNo]; ok {
-               close(prevConn.reader.recv)
+       if lastConn, ok := c.connections[serialNo]; ok {
+               close(lastConn.reader.recv)
        }
-       c.connections[serialNo] = vc
-       return vc, nil
+       c.connections[serialNo] = mc
+       return mc, nil
 }
 
 func (c *Connection) close(lastErr error, done chan struct{}) {
@@ -230,7 +231,7 @@ func (c *Connection) close(lastErr error, done chan 
struct{}) {
 
        c.state = Closing
        c.err = lastErr
-       c.connections = make(map[uint32]*MultiplexedConnection)
+       c.connections = make(map[uint32]*MultiplexConnection)
        close(c.done)
        if c.conn != nil {
                c.conn.Close()
@@ -311,6 +312,9 @@ func (c *Connection) write(b []byte) error {
        return nil
 }
 
+// The response handling logic of the TCP connection.
+// 1. Read from the connection and decode it to the TransportResponse.
+// 2. Send the response to the corresponding multiplex connection based on the 
serialNo.
 func (c *Connection) reader() {
        var lastErr error
        for {
@@ -337,6 +341,20 @@ func (c *Connection) reader() {
        c.close(lastErr, c.done)
 }
 
+type writerBuffer struct {
+       buffer chan []byte
+       done   <-chan struct{}
+}
+
+func (w *writerBuffer) get() ([]byte, error) {
+       select {
+       case req := <-w.buffer:
+               return req, nil
+       case <-w.done:
+               return nil, ErrWriteBufferDone
+       }
+}
+
 func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) 
{
        var timeout time.Duration
        t, ok := ctx.Deadline()
diff --git 
a/tubemq-client-twins/tubemq-client-go/multiplexed/multlplexed_test.go 
b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
similarity index 96%
rename from tubemq-client-twins/tubemq-client-go/multiplexed/multlplexed_test.go
rename to tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
index 4584607..af1d416 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexed/multlplexed_test.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package multiplexed
+package multiplexing
 
 import (
        "bytes"
@@ -87,7 +87,7 @@ func Encode(serialNo uint32, body []byte) ([]byte, error) {
        return buf.Bytes(), nil
 }
 
-func TestBasicMultiplexed(t *testing.T) {
+func TestBasicMultiplexing(t *testing.T) {
        serialNo := atomic.AddUint32(&serialNo, 1)
        ctx, cancel := context.WithTimeout(context.Background(), time.Second)
        defer cancel()
@@ -107,7 +107,7 @@ func TestBasicMultiplexed(t *testing.T) {
        assert.Equal(t, mc.Write(nil), nil)
 }
 
-func TestConcurrentMultiplexed(t *testing.T) {
+func TestConcurrentMultiplexing(t *testing.T) {
        count := 1000
        m := NewPool()
        wg := sync.WaitGroup{}

Reply via email to