This is an automated email from the ASF dual-hosted git repository.

dcelasun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/thrift.git


The following commit(s) were added to refs/heads/master by this push:
     new e382275  THRIFT-5214: Push read deadline in socketConn
e382275 is described below

commit e382275bad2bd11fb5df33dd7db520fd7596f4ac
Author: Yuxuan 'fishy' Wang <[email protected]>
AuthorDate: Mon Jun 8 06:06:17 2020 -0700

    THRIFT-5214: Push read deadline in socketConn
    
    Client: go
    
    We added socketConn to go library for connectivity check in
    https://github.com/apache/thrift/pull/2153, but forgot to push read
    deadline on the socket when doing the connectivity checks. This caused
    the issue of large number of connectivity checks to fail with I/O
    timeout errors.
---
 lib/go/thrift/socket.go           |  8 +++++++-
 lib/go/thrift/socket_conn.go      |  5 ++++-
 lib/go/thrift/socket_unix_conn.go | 16 ++++++++++++----
 lib/go/thrift/ssl_socket.go       |  9 ++++++++-
 4 files changed, 31 insertions(+), 7 deletions(-)

diff --git a/lib/go/thrift/socket.go b/lib/go/thrift/socket.go
index 7c765f5..5080894 100644
--- a/lib/go/thrift/socket.go
+++ b/lib/go/thrift/socket.go
@@ -58,7 +58,9 @@ func NewTSocketFromAddrTimeout(addr net.Addr, connTimeout 
time.Duration, soTimeo
 
 // Creates a TSocket from an existing net.Conn
 func NewTSocketFromConnTimeout(conn net.Conn, connTimeout time.Duration) 
*TSocket {
-       return &TSocket{conn: wrapSocketConn(conn), addr: conn.RemoteAddr(), 
connectTimeout: connTimeout, socketTimeout: connTimeout}
+       sock := &TSocket{conn: wrapSocketConn(conn), addr: conn.RemoteAddr(), 
connectTimeout: connTimeout, socketTimeout: connTimeout}
+       sock.conn.socketTimeout = connTimeout
+       return sock
 }
 
 // Sets the connect timeout
@@ -70,6 +72,9 @@ func (p *TSocket) SetConnTimeout(timeout time.Duration) error 
{
 // Sets the socket timeout
 func (p *TSocket) SetSocketTimeout(timeout time.Duration) error {
        p.socketTimeout = timeout
+       if p.conn != nil {
+               p.conn.socketTimeout = timeout
+       }
        return nil
 }
 
@@ -109,6 +114,7 @@ func (p *TSocket) Open() error {
        )); err != nil {
                return NewTTransportException(NOT_OPEN, err.Error())
        }
+       p.conn.socketTimeout = p.socketTimeout
        return nil
 }
 
diff --git a/lib/go/thrift/socket_conn.go b/lib/go/thrift/socket_conn.go
index b0f7b3e..5ed598e 100644
--- a/lib/go/thrift/socket_conn.go
+++ b/lib/go/thrift/socket_conn.go
@@ -23,13 +23,16 @@ import (
        "bytes"
        "io"
        "net"
+       "time"
 )
 
 // socketConn is a wrapped net.Conn that tries to do connectivity check.
 type socketConn struct {
        net.Conn
 
-       buf bytes.Buffer
+       socketTimeout time.Duration
+       buf           bytes.Buffer
+       buffer        [1]byte
 }
 
 var _ net.Conn = (*socketConn)(nil)
diff --git a/lib/go/thrift/socket_unix_conn.go 
b/lib/go/thrift/socket_unix_conn.go
index f18e0e6..789b4fa 100644
--- a/lib/go/thrift/socket_unix_conn.go
+++ b/lib/go/thrift/socket_unix_conn.go
@@ -24,6 +24,7 @@ package thrift
 import (
        "io"
        "syscall"
+       "time"
 )
 
 func (sc *socketConn) read0() error {
@@ -36,16 +37,23 @@ func (sc *socketConn) checkConn() error {
                // No way to check, return nil
                return nil
        }
+
+       // Push read deadline
+       var t time.Time
+       if sc.socketTimeout > 0 {
+               t = time.Now().Add(sc.socketTimeout)
+       }
+       sc.Conn.SetReadDeadline(t)
+
        rc, err := syscallConn.SyscallConn()
        if err != nil {
                return err
        }
 
        var n int
-       var buf [1]byte
 
        if readErr := rc.Read(func(fd uintptr) bool {
-               n, err = syscall.Read(int(fd), buf[:])
+               n, err = syscall.Read(int(fd), sc.buffer[:])
                return true
        }); readErr != nil {
                return readErr
@@ -58,9 +66,9 @@ func (sc *socketConn) checkConn() error {
        }
 
        if n > 0 {
-               // We got 1 byte,
+               // We got something,
                // put it to sc's buf for the next real read to use.
-               sc.buf.Write(buf[:])
+               sc.buf.Write(sc.buffer[:n])
                return nil
        }
 
diff --git a/lib/go/thrift/ssl_socket.go b/lib/go/thrift/ssl_socket.go
index 661111c..6e90438 100644
--- a/lib/go/thrift/ssl_socket.go
+++ b/lib/go/thrift/ssl_socket.go
@@ -62,12 +62,17 @@ func NewTSSLSocketFromAddrTimeout(addr net.Addr, cfg 
*tls.Config, timeout time.D
 
 // Creates a TSSLSocket from an existing net.Conn
 func NewTSSLSocketFromConnTimeout(conn net.Conn, cfg *tls.Config, timeout 
time.Duration) *TSSLSocket {
-       return &TSSLSocket{conn: wrapSocketConn(conn), addr: conn.RemoteAddr(), 
timeout: timeout, cfg: cfg}
+       sock := &TSSLSocket{conn: wrapSocketConn(conn), addr: 
conn.RemoteAddr(), timeout: timeout, cfg: cfg}
+       sock.conn.socketTimeout = timeout
+       return sock
 }
 
 // Sets the socket timeout
 func (p *TSSLSocket) SetTimeout(timeout time.Duration) error {
        p.timeout = timeout
+       if p.conn != nil {
+               p.conn.socketTimeout = timeout
+       }
        return nil
 }
 
@@ -101,6 +106,7 @@ func (p *TSSLSocket) Open() error {
                )); err != nil {
                        return NewTTransportException(NOT_OPEN, err.Error())
                }
+               p.conn.socketTimeout = p.timeout
        } else {
                if p.conn.isValid() {
                        return NewTTransportException(ALREADY_OPEN, "Socket 
already connected.")
@@ -124,6 +130,7 @@ func (p *TSSLSocket) Open() error {
                )); err != nil {
                        return NewTTransportException(NOT_OPEN, err.Error())
                }
+               p.conn.socketTimeout = p.timeout
        }
        return nil
 }

Reply via email to