This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch INLONG-25 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit a1c86bccac13bb68d58eebb360b3610fa46eb96b Author: Zijie Lu <[email protected]> AuthorDate: Sat May 1 11:15:35 2021 +0800 nitpick Signed-off-by: Zijie Lu <[email protected]> --- .../tubemq-client-go/multiplexing/multiplexing.go | 216 ++++++++++----------- 1 file changed, 108 insertions(+), 108 deletions(-) diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go index 06e09d2..d1ee603 100644 --- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go +++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go @@ -112,6 +112,73 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*Multi return c.new(ctx, serialNo) } +func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) { + var timeout time.Duration + t, ok := ctx.Deadline() + if ok { + timeout = t.Sub(time.Now()) + } + dialOpts := &DialOptions{ + Network: "tcp", + Address: address, + Timeout: timeout, + } + select { + case <-ctx.Done(): + return nil, dialOpts, ctx.Err() + default: + } + conn, err := dialWithTimeout(dialOpts) + return conn, dialOpts, err +} + +func dialWithTimeout(opts *DialOptions) (net.Conn, error) { + if len(opts.CACertFile) == 0 { + return net.DialTimeout(opts.Network, opts.Address, opts.Timeout) + } + + tlsConf := &tls.Config{} + if opts.CACertFile == "none" { + tlsConf.InsecureSkipVerify = true + } else { + if len(opts.TLSServerName) == 0 { + opts.TLSServerName = opts.Address + } + tlsConf.ServerName = opts.TLSServerName + certPool, err := getCertPool(opts.CACertFile) + if err != nil { + return nil, err + } + + tlsConf.RootCAs = certPool + + if len(opts.TLSCertFile) != 0 { + cert, err := tls.LoadX509KeyPair(opts.TLSCertFile, opts.TLSKeyFile) + if err != nil { + return nil, err + } + tlsConf.Certificates = []tls.Certificate{cert} + } + } + return tls.DialWithDialer(&net.Dialer{Timeout: opts.Timeout}, opts.Network, opts.Address, tlsConf) +} + +func getCertPool(caCertFile string) (*x509.CertPool, error) { + if caCertFile != "root" { + ca, err := ioutil.ReadFile(caCertFile) + if err != nil { + return nil, err + } + certPool := x509.NewCertPool() + ok := certPool.AppendCertsFromPEM(ca) + if !ok { + return nil, err + } + return certPool, nil + } + return nil, nil +} + type recvReader struct { ctx context.Context recv chan codec.TransportResponse @@ -260,6 +327,35 @@ func (c *Connection) reconnect() error { return nil } +// The response handling logic of the TCP connection. +// 1. Read from the connection and decode it to the TransportResponse. +// 2. Send the response to the corresponding multiplex connection based on the serialNo. +func (c *Connection) reader() { + var lastErr error + for { + select { + case <-c.done: + return + default: + } + rsp, err := c.decoder.Decode() + if err != nil { + lastErr = err + break + } + serialNo := rsp.GetSerialNo() + c.mu.RLock() + mc, ok := c.connections[serialNo] + c.mu.RUnlock() + if !ok { + continue + } + mc.reader.recv <- rsp + mc.conn.remove(rsp.GetSerialNo()) + } + c.close(lastErr, c.done) +} + func (c *Connection) writer() { var lastErr error for { @@ -281,6 +377,18 @@ func (c *Connection) writer() { c.close(lastErr, c.done) } +func (c *Connection) write(b []byte) error { + sent := 0 + for sent < len(b) { + n, err := c.conn.Write(b[sent:]) + if err != nil { + return err + } + sent += n + } + return nil +} + func (c *Connection) send(b []byte) error { if c.state == Closed { return ErrConnClosed @@ -300,47 +408,6 @@ func (c *Connection) remove(id uint32) { c.mu.Unlock() } -func (c *Connection) write(b []byte) error { - sent := 0 - for sent < len(b) { - n, err := c.conn.Write(b[sent:]) - if err != nil { - return err - } - sent += n - } - return nil -} - -// The response handling logic of the TCP connection. -// 1. Read from the connection and decode it to the TransportResponse. -// 2. Send the response to the corresponding multiplex connection based on the serialNo. -func (c *Connection) reader() { - var lastErr error - for { - select { - case <-c.done: - return - default: - } - rsp, err := c.decoder.Decode() - if err != nil { - lastErr = err - break - } - serialNo := rsp.GetSerialNo() - c.mu.RLock() - mc, ok := c.connections[serialNo] - c.mu.RUnlock() - if !ok { - continue - } - mc.reader.recv <- rsp - mc.conn.remove(rsp.GetSerialNo()) - } - c.close(lastErr, c.done) -} - type writerBuffer struct { buffer chan []byte done <-chan struct{} @@ -354,70 +421,3 @@ func (w *writerBuffer) get() ([]byte, error) { return nil, ErrWriteBufferDone } } - -func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) { - var timeout time.Duration - t, ok := ctx.Deadline() - if ok { - timeout = t.Sub(time.Now()) - } - dialOpts := &DialOptions{ - Network: "tcp", - Address: address, - Timeout: timeout, - } - select { - case <-ctx.Done(): - return nil, dialOpts, ctx.Err() - default: - } - conn, err := dialWithTimeout(dialOpts) - return conn, dialOpts, err -} - -func dialWithTimeout(opts *DialOptions) (net.Conn, error) { - if len(opts.CACertFile) == 0 { - return net.DialTimeout(opts.Network, opts.Address, opts.Timeout) - } - - tlsConf := &tls.Config{} - if opts.CACertFile == "none" { - tlsConf.InsecureSkipVerify = true - } else { - if len(opts.TLSServerName) == 0 { - opts.TLSServerName = opts.Address - } - tlsConf.ServerName = opts.TLSServerName - certPool, err := getCertPool(opts.CACertFile) - if err != nil { - return nil, err - } - - tlsConf.RootCAs = certPool - - if len(opts.TLSCertFile) != 0 { - cert, err := tls.LoadX509KeyPair(opts.TLSCertFile, opts.TLSKeyFile) - if err != nil { - return nil, err - } - tlsConf.Certificates = []tls.Certificate{cert} - } - } - return tls.DialWithDialer(&net.Dialer{Timeout: opts.Timeout}, opts.Network, opts.Address, tlsConf) -} - -func getCertPool(caCertFile string) (*x509.CertPool, error) { - if caCertFile != "root" { - ca, err := ioutil.ReadFile(caCertFile) - if err != nil { - return nil, err - } - certPool := x509.NewCertPool() - ok := certPool.AppendCertsFromPEM(ca) - if !ok { - return nil, err - } - return certPool, nil - } - return nil, nil -}
