This is an automated email from the ASF dual-hosted git repository. alexstocks pushed a commit to branch improve/delte-handleLoop in repository https://gitbox.apache.org/repos/asf/dubbo-getty.git
commit 675313bd355ecdea0a354696e0d84af1f9555768 Author: AlexStocks <[email protected]> AuthorDate: Sun Jan 17 22:13:08 2021 +0800 delete session.handleLoop --- client.go | 15 +++++---- go.mod | 2 +- go.sum | 10 ++++-- server.go | 3 +- session.go | 106 ++++++++++++++++++++++++------------------------------------- 5 files changed, 60 insertions(+), 76 deletions(-) diff --git a/client.go b/client.go index a11c73b..d7e86fd 100644 --- a/client.go +++ b/client.go @@ -22,6 +22,7 @@ import ( "crypto/x509" "encoding/pem" "fmt" + gxtime "github.com/dubbogo/gost/time" "io/ioutil" "net" "strings" @@ -170,7 +171,7 @@ func (c *client) dialTCP() Session { } log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err)) - <-wheel.After(connectInterval) + <-gxtime.After(connectInterval) } } @@ -202,7 +203,7 @@ func (c *client) dialUDP() Session { } if err != nil { log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, perrors.WithStack(err)) - <-wheel.After(connectInterval) + <-gxtime.After(connectInterval) continue } @@ -211,7 +212,7 @@ func (c *client) dialUDP() Session { if length, err = conn.Write(connectPingPackage[:]); err != nil { conn.Close() log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, perrors.WithStack(err)) - <-wheel.After(connectInterval) + <-gxtime.After(connectInterval) continue } conn.SetReadDeadline(time.Now().Add(1e9)) @@ -222,7 +223,7 @@ func (c *client) dialUDP() Session { if err != nil { log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, perrors.WithStack(err)) conn.Close() - <-wheel.After(connectInterval) + <-gxtime.After(connectInterval) continue } //if err == nil { @@ -260,7 +261,7 @@ func (c *client) dialWS() Session { } log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err)) - <-wheel.After(connectInterval) + <-gxtime.After(connectInterval) } } @@ -338,7 +339,7 @@ func (c *client) dialWSS() Session { } log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, perrors.WithStack(err)) - <-wheel.After(connectInterval) + <-gxtime.After(connectInterval) } } @@ -445,7 +446,7 @@ func (c *client) reConnect() { if maxTimes < times { times = maxTimes } - <-wheel.After(time.Duration(int64(times) * int64(interval))) + <-gxtime.After(time.Duration(int64(times) * int64(interval))) } } diff --git a/go.mod b/go.mod index 6dd2917..baf4991 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/apache/dubbo-getty go 1.14 require ( - github.com/dubbogo/gost v1.10.1 + github.com/dubbogo/gost v1.10.4 github.com/golang/snappy v0.0.1 github.com/gorilla/websocket v1.4.2 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 175c5ae..c5ead10 100644 --- a/go.sum +++ b/go.sum @@ -5,8 +5,8 @@ github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrU github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dubbogo/gost v1.10.1 h1:39kF9Cd5JOiMpmwG6dX1/aLWNFqFv9gHp8HrhzMmjLY= -github.com/dubbogo/gost v1.10.1/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI= +github.com/dubbogo/gost v1.10.4 h1:z9Kw3tgLc9cDmA3gu0hgzjr/slsprZNzssK4zXFqo8s= +github.com/dubbogo/gost v1.10.4/go.mod h1:w8Yw29eDWtRVo3tx9nPpHkNZnOi4SRx1fZf7eVlAAU4= github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU= github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= @@ -15,7 +15,9 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQF+M0ao65imhwqKnz3Q2z/d8PWZRMQvDM= github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k= +github.com/k0kubun/pp v3.0.1+incompatible h1:3tqvf7QgUnZ5tXO6pNAZlrvHgl6DvifjDrd9g2S9Z40= github.com/k0kubun/pp v3.0.1+incompatible/go.mod h1:GWse8YhT0p8pT4ir3ZgBbfZild3tgzSScAn6HmfYukg= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= @@ -23,7 +25,9 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/mattn/go-colorable v0.1.7 h1:bQGKb3vps/j0E9GfJQ03JyhRuxsvdAanXlT9BTw3mdw= github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -40,6 +44,8 @@ github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4= diff --git a/server.go b/server.go index 9f5e586..aa3bb6d 100644 --- a/server.go +++ b/server.go @@ -22,6 +22,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" + gxtime "github.com/dubbogo/gost/time" "io/ioutil" "net" "net/http" @@ -268,7 +269,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) { return } if delay != 0 { - <-wheel.After(delay) + <-gxtime.After(delay) } client, err = s.accept(newSession) if err != nil { diff --git a/session.go b/session.go index e9176d1..4f88b27 100644 --- a/session.go +++ b/session.go @@ -59,17 +59,12 @@ const ( ///////////////////////////////////////// var ( - wheel *gxtime.Wheel + defaultTimerWheel *gxtime.TimerWheel ) func init() { - span := 100e6 // 100ms - buckets := MaxWheelTimeSpan / span - wheel = gxtime.NewWheel(time.Duration(span), int(buckets)) // wheel longest span is 15 minute -} - -func GetTimeWheel() *gxtime.Wheel { - return wheel + gxtime.InitDefaultTimerWheel() + defaultTimerWheel = gxtime.GetDefaultTimerWheel() } // getty base session @@ -102,8 +97,6 @@ type session struct { // goroutines sync grNum int32 - // read goroutines done signal - rDone chan struct{} lock sync.RWMutex } @@ -122,7 +115,6 @@ func newSession(endPoint EndPoint, conn Connection) *session { done: make(chan struct{}), wait: pendingDuration, attrs: gxcontext.NewValuesContext(context.Background()), - rDone: make(chan struct{}), } ss.Connection.setSession(ss) @@ -164,7 +156,6 @@ func (s *session) Reset() { period: period, wait: pendingDuration, attrs: gxcontext.NewValuesContext(context.Background()), - rDone: make(chan struct{}), } } @@ -460,6 +451,34 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { return nil } +func sessionTimerLoop(_ gxtime.TimerID, _ time.Time, arg interface{}) error { + ss, _ := arg.(*session) + if ss != nil && ss.IsClosed() { + return ErrSessionClosed + } + + f := func() { + wsConn, wsFlag := ss.Connection.(*gettyWSConn) + if wsFlag { + err := wsConn.writePing() + if err != nil { + log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err)) + } + } + + ss.listener.OnCron(ss) + } + + // if enable task pool, run @f asynchronously. + if taskPool := ss.EndPoint().GetTaskPool(); taskPool != nil { + taskPool.AddTaskAlways(f) + return nil + } + + f() + return nil +} + // func (s *session) RunEventLoop() { func (s *session) run() { if s.Connection == nil || s.listener == nil || s.writer == nil { @@ -477,56 +496,12 @@ func (s *session) run() { return } - // start read/write gr - atomic.AddInt32(&(s.grNum), 2) - go s.handleLoop() - go s.handlePackage() -} - -func (s *session) handleLoop() { - var ( - wsFlag bool - wsConn *gettyWSConn - counter gxtime.CountWatch - ) - - defer func() { - if r := recover(); r != nil { - const size = 64 << 10 - rBuf := make([]byte, size) - rBuf = rBuf[:runtime.Stack(rBuf, false)] - log.Errorf("[session.handleLoop] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf) - } - - grNum := atomic.AddInt32(&(s.grNum), -1) - s.listener.OnClose(s) - log.Infof("%s, [session.handleLoop] goroutine exit now, left gr num %d", s.Stat(), grNum) - s.gc() - }() - - wsConn, wsFlag = s.Connection.(*gettyWSConn) -LOOP: - for { - select { - case <-s.done: - // this case branch assure the (session)handleLoop gr will exit after (session)handlePackage gr. - <-s.rDone - counter.Start() - if counter.Count() > s.wait.Nanoseconds() { - log.Infof("%s, [session.handleLoop] got done signal ", s.Stat()) - break LOOP - } - - case <-wheel.After(s.period): - if wsFlag { - err := wsConn.writePing() - if err != nil { - log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err)) - } - } - s.listener.OnCron(s) - } + atomic.AddInt32(&(s.grNum), 1) + if _, err := defaultTimerWheel.AddTimer(sessionTimerLoop, gxtime.TimerLoop, s.period, s); err != nil { + panic(fmt.Sprintf("failed to add session %s to defaultTimerWheel", s.Stat())) } + // start read gr + go s.handlePackage() } func (s *session) addTask(pkg interface{}) { @@ -553,8 +528,6 @@ func (s *session) handlePackage() { rBuf = rBuf[:runtime.Stack(rBuf, false)] log.Errorf("[session.handlePackage] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf) } - - close(s.rDone) grNum := atomic.AddInt32(&(s.grNum), -1) log.Infof("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum) s.stop() @@ -564,6 +537,9 @@ func (s *session) handlePackage() { s.listener.OnError(s, err) } } + + s.listener.OnClose(s) + s.gc() }() if _, ok := s.Connection.(*gettyTCPConn); ok { @@ -702,8 +678,8 @@ func (s *session) handleUDPPackage() error { if int(s.maxMsgLen<<1) < bufLen { maxBufLen = int(s.maxMsgLen << 1) } - bufp = gxbytes.GetBytes(maxBufLen) - defer gxbytes.PutBytes(bufp) + bufp = gxbytes.AcquireBytes(maxBufLen) + defer gxbytes.ReleaseBytes(bufp) buf = *bufp for { if s.IsClosed() {
