This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit a1c86bccac13bb68d58eebb360b3610fa46eb96b
Author: Zijie Lu <[email protected]>
AuthorDate: Sat May 1 11:15:35 2021 +0800

    nitpick
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 .../tubemq-client-go/multiplexing/multiplexing.go  | 216 ++++++++++-----------
 1 file changed, 108 insertions(+), 108 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go 
b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index 06e09d2..d1ee603 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -112,6 +112,73 @@ func (p *Pool) Get(ctx context.Context, address string, 
serialNo uint32) (*Multi
        return c.new(ctx, serialNo)
 }
 
+func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) 
{
+       var timeout time.Duration
+       t, ok := ctx.Deadline()
+       if ok {
+               timeout = t.Sub(time.Now())
+       }
+       dialOpts := &DialOptions{
+               Network: "tcp",
+               Address: address,
+               Timeout: timeout,
+       }
+       select {
+       case <-ctx.Done():
+               return nil, dialOpts, ctx.Err()
+       default:
+       }
+       conn, err := dialWithTimeout(dialOpts)
+       return conn, dialOpts, err
+}
+
+func dialWithTimeout(opts *DialOptions) (net.Conn, error) {
+       if len(opts.CACertFile) == 0 {
+               return net.DialTimeout(opts.Network, opts.Address, opts.Timeout)
+       }
+
+       tlsConf := &tls.Config{}
+       if opts.CACertFile == "none" {
+               tlsConf.InsecureSkipVerify = true
+       } else {
+               if len(opts.TLSServerName) == 0 {
+                       opts.TLSServerName = opts.Address
+               }
+               tlsConf.ServerName = opts.TLSServerName
+               certPool, err := getCertPool(opts.CACertFile)
+               if err != nil {
+                       return nil, err
+               }
+
+               tlsConf.RootCAs = certPool
+
+               if len(opts.TLSCertFile) != 0 {
+                       cert, err := tls.LoadX509KeyPair(opts.TLSCertFile, 
opts.TLSKeyFile)
+                       if err != nil {
+                               return nil, err
+                       }
+                       tlsConf.Certificates = []tls.Certificate{cert}
+               }
+       }
+       return tls.DialWithDialer(&net.Dialer{Timeout: opts.Timeout}, 
opts.Network, opts.Address, tlsConf)
+}
+
+func getCertPool(caCertFile string) (*x509.CertPool, error) {
+       if caCertFile != "root" {
+               ca, err := ioutil.ReadFile(caCertFile)
+               if err != nil {
+                       return nil, err
+               }
+               certPool := x509.NewCertPool()
+               ok := certPool.AppendCertsFromPEM(ca)
+               if !ok {
+                       return nil, err
+               }
+               return certPool, nil
+       }
+       return nil, nil
+}
+
 type recvReader struct {
        ctx  context.Context
        recv chan codec.TransportResponse
@@ -260,6 +327,35 @@ func (c *Connection) reconnect() error {
        return nil
 }
 
+// The response handling logic of the TCP connection.
+// 1. Read from the connection and decode it to the TransportResponse.
+// 2. Send the response to the corresponding multiplex connection based on the 
serialNo.
+func (c *Connection) reader() {
+       var lastErr error
+       for {
+               select {
+               case <-c.done:
+                       return
+               default:
+               }
+               rsp, err := c.decoder.Decode()
+               if err != nil {
+                       lastErr = err
+                       break
+               }
+               serialNo := rsp.GetSerialNo()
+               c.mu.RLock()
+               mc, ok := c.connections[serialNo]
+               c.mu.RUnlock()
+               if !ok {
+                       continue
+               }
+               mc.reader.recv <- rsp
+               mc.conn.remove(rsp.GetSerialNo())
+       }
+       c.close(lastErr, c.done)
+}
+
 func (c *Connection) writer() {
        var lastErr error
        for {
@@ -281,6 +377,18 @@ func (c *Connection) writer() {
        c.close(lastErr, c.done)
 }
 
+func (c *Connection) write(b []byte) error {
+       sent := 0
+       for sent < len(b) {
+               n, err := c.conn.Write(b[sent:])
+               if err != nil {
+                       return err
+               }
+               sent += n
+       }
+       return nil
+}
+
 func (c *Connection) send(b []byte) error {
        if c.state == Closed {
                return ErrConnClosed
@@ -300,47 +408,6 @@ func (c *Connection) remove(id uint32) {
        c.mu.Unlock()
 }
 
-func (c *Connection) write(b []byte) error {
-       sent := 0
-       for sent < len(b) {
-               n, err := c.conn.Write(b[sent:])
-               if err != nil {
-                       return err
-               }
-               sent += n
-       }
-       return nil
-}
-
-// The response handling logic of the TCP connection.
-// 1. Read from the connection and decode it to the TransportResponse.
-// 2. Send the response to the corresponding multiplex connection based on the 
serialNo.
-func (c *Connection) reader() {
-       var lastErr error
-       for {
-               select {
-               case <-c.done:
-                       return
-               default:
-               }
-               rsp, err := c.decoder.Decode()
-               if err != nil {
-                       lastErr = err
-                       break
-               }
-               serialNo := rsp.GetSerialNo()
-               c.mu.RLock()
-               mc, ok := c.connections[serialNo]
-               c.mu.RUnlock()
-               if !ok {
-                       continue
-               }
-               mc.reader.recv <- rsp
-               mc.conn.remove(rsp.GetSerialNo())
-       }
-       c.close(lastErr, c.done)
-}
-
 type writerBuffer struct {
        buffer chan []byte
        done   <-chan struct{}
@@ -354,70 +421,3 @@ func (w *writerBuffer) get() ([]byte, error) {
                return nil, ErrWriteBufferDone
        }
 }
-
-func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) 
{
-       var timeout time.Duration
-       t, ok := ctx.Deadline()
-       if ok {
-               timeout = t.Sub(time.Now())
-       }
-       dialOpts := &DialOptions{
-               Network: "tcp",
-               Address: address,
-               Timeout: timeout,
-       }
-       select {
-       case <-ctx.Done():
-               return nil, dialOpts, ctx.Err()
-       default:
-       }
-       conn, err := dialWithTimeout(dialOpts)
-       return conn, dialOpts, err
-}
-
-func dialWithTimeout(opts *DialOptions) (net.Conn, error) {
-       if len(opts.CACertFile) == 0 {
-               return net.DialTimeout(opts.Network, opts.Address, opts.Timeout)
-       }
-
-       tlsConf := &tls.Config{}
-       if opts.CACertFile == "none" {
-               tlsConf.InsecureSkipVerify = true
-       } else {
-               if len(opts.TLSServerName) == 0 {
-                       opts.TLSServerName = opts.Address
-               }
-               tlsConf.ServerName = opts.TLSServerName
-               certPool, err := getCertPool(opts.CACertFile)
-               if err != nil {
-                       return nil, err
-               }
-
-               tlsConf.RootCAs = certPool
-
-               if len(opts.TLSCertFile) != 0 {
-                       cert, err := tls.LoadX509KeyPair(opts.TLSCertFile, 
opts.TLSKeyFile)
-                       if err != nil {
-                               return nil, err
-                       }
-                       tlsConf.Certificates = []tls.Certificate{cert}
-               }
-       }
-       return tls.DialWithDialer(&net.Dialer{Timeout: opts.Timeout}, 
opts.Network, opts.Address, tlsConf)
-}
-
-func getCertPool(caCertFile string) (*x509.CertPool, error) {
-       if caCertFile != "root" {
-               ca, err := ioutil.ReadFile(caCertFile)
-               if err != nil {
-                       return nil, err
-               }
-               certPool := x509.NewCertPool()
-               ok := certPool.AppendCertsFromPEM(ca)
-               if !ok {
-                       return nil, err
-               }
-               return certPool, nil
-       }
-       return nil, nil
-}

Reply via email to