This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch improve/delte-handleLoop
in repository https://gitbox.apache.org/repos/asf/dubbo-getty.git

commit 675313bd355ecdea0a354696e0d84af1f9555768
Author: AlexStocks <[email protected]>
AuthorDate: Sun Jan 17 22:13:08 2021 +0800

    delete session.handleLoop
---
 client.go  |  15 +++++----
 go.mod     |   2 +-
 go.sum     |  10 ++++--
 server.go  |   3 +-
 session.go | 106 ++++++++++++++++++++++++-------------------------------------
 5 files changed, 60 insertions(+), 76 deletions(-)

diff --git a/client.go b/client.go
index a11c73b..d7e86fd 100644
--- a/client.go
+++ b/client.go
@@ -22,6 +22,7 @@ import (
        "crypto/x509"
        "encoding/pem"
        "fmt"
+       gxtime "github.com/dubbogo/gost/time"
        "io/ioutil"
        "net"
        "strings"
@@ -170,7 +171,7 @@ func (c *client) dialTCP() Session {
                }
 
                log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", 
c.addr, connectTimeout, perrors.WithStack(err))
-               <-wheel.After(connectInterval)
+               <-gxtime.After(connectInterval)
        }
 }
 
@@ -202,7 +203,7 @@ func (c *client) dialUDP() Session {
                }
                if err != nil {
                        log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = 
error:%+v", c.addr, perrors.WithStack(err))
-                       <-wheel.After(connectInterval)
+                       <-gxtime.After(connectInterval)
                        continue
                }
 
@@ -211,7 +212,7 @@ func (c *client) dialUDP() Session {
                if length, err = conn.Write(connectPingPackage[:]); err != nil {
                        conn.Close()
                        log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", 
string(connectPingPackage), length, perrors.WithStack(err))
-                       <-wheel.After(connectInterval)
+                       <-gxtime.After(connectInterval)
                        continue
                }
                conn.SetReadDeadline(time.Now().Add(1e9))
@@ -222,7 +223,7 @@ func (c *client) dialUDP() Session {
                if err != nil {
                        log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", 
conn, length, perrors.WithStack(err))
                        conn.Close()
-                       <-wheel.After(connectInterval)
+                       <-gxtime.After(connectInterval)
                        continue
                }
                //if err == nil {
@@ -260,7 +261,7 @@ func (c *client) dialWS() Session {
                }
 
                log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, 
perrors.WithStack(err))
-               <-wheel.After(connectInterval)
+               <-gxtime.After(connectInterval)
        }
 }
 
@@ -338,7 +339,7 @@ func (c *client) dialWSS() Session {
                }
 
                log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, 
perrors.WithStack(err))
-               <-wheel.After(connectInterval)
+               <-gxtime.After(connectInterval)
        }
 }
 
@@ -445,7 +446,7 @@ func (c *client) reConnect() {
                if maxTimes < times {
                        times = maxTimes
                }
-               <-wheel.After(time.Duration(int64(times) * int64(interval)))
+               <-gxtime.After(time.Duration(int64(times) * int64(interval)))
        }
 }
 
diff --git a/go.mod b/go.mod
index 6dd2917..baf4991 100644
--- a/go.mod
+++ b/go.mod
@@ -3,7 +3,7 @@ module github.com/apache/dubbo-getty
 go 1.14
 
 require (
-       github.com/dubbogo/gost v1.10.1
+       github.com/dubbogo/gost v1.10.4
        github.com/golang/snappy v0.0.1
        github.com/gorilla/websocket v1.4.2
        github.com/pkg/errors v0.9.1
diff --git a/go.sum b/go.sum
index 175c5ae..c5ead10 100644
--- a/go.sum
+++ b/go.sum
@@ -5,8 +5,8 @@ github.com/StackExchange/wmi 
v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrU
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/dubbogo/gost v1.10.1 h1:39kF9Cd5JOiMpmwG6dX1/aLWNFqFv9gHp8HrhzMmjLY=
-github.com/dubbogo/gost v1.10.1/go.mod 
h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI=
+github.com/dubbogo/gost v1.10.4 h1:z9Kw3tgLc9cDmA3gu0hgzjr/slsprZNzssK4zXFqo8s=
+github.com/dubbogo/gost v1.10.4/go.mod 
h1:w8Yw29eDWtRVo3tx9nPpHkNZnOi4SRx1fZf7eVlAAU4=
 github.com/dubbogo/jsonparser v1.0.1/go.mod 
h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
 github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
 github.com/go-ole/go-ole v1.2.4/go.mod 
h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
@@ -15,7 +15,9 @@ github.com/golang/snappy v0.0.1/go.mod 
h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
 github.com/google/renameio v0.1.0/go.mod 
h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
 github.com/gorilla/websocket v1.4.2 
h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
 github.com/gorilla/websocket v1.4.2/go.mod 
h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 
h1:uC1QfSlInpQF+M0ao65imhwqKnz3Q2z/d8PWZRMQvDM=
 github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod 
h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k=
+github.com/k0kubun/pp v3.0.1+incompatible 
h1:3tqvf7QgUnZ5tXO6pNAZlrvHgl6DvifjDrd9g2S9Z40=
 github.com/k0kubun/pp v3.0.1+incompatible/go.mod 
h1:GWse8YhT0p8pT4ir3ZgBbfZild3tgzSScAn6HmfYukg=
 github.com/kisielk/gotool v1.0.0/go.mod 
h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
 github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
@@ -23,7 +25,9 @@ github.com/kr/pretty v0.1.0/go.mod 
h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
 github.com/kr/text v0.1.0/go.mod 
h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/mattn/go-colorable v0.1.7 
h1:bQGKb3vps/j0E9GfJQ03JyhRuxsvdAanXlT9BTw3mdw=
 github.com/mattn/go-colorable v0.1.7/go.mod 
h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
+github.com/mattn/go-isatty v0.0.12 
h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
 github.com/mattn/go-isatty v0.0.12/go.mod 
h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
 github.com/pkg/errors v0.8.1/go.mod 
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -40,6 +44,8 @@ github.com/stretchr/testify v1.6.1 
h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd
 github.com/stretchr/testify v1.6.1/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
 go.uber.org/atomic v1.6.0/go.mod 
h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
+go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
+go.uber.org/atomic v1.7.0/go.mod 
h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
 go.uber.org/multierr v1.5.0/go.mod 
h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
 go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee 
h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
diff --git a/server.go b/server.go
index 9f5e586..aa3bb6d 100644
--- a/server.go
+++ b/server.go
@@ -22,6 +22,7 @@ import (
        "crypto/tls"
        "crypto/x509"
        "fmt"
+       gxtime "github.com/dubbogo/gost/time"
        "io/ioutil"
        "net"
        "net/http"
@@ -268,7 +269,7 @@ func (s *server) runTcpEventLoop(newSession 
NewSessionCallback) {
                                return
                        }
                        if delay != 0 {
-                               <-wheel.After(delay)
+                               <-gxtime.After(delay)
                        }
                        client, err = s.accept(newSession)
                        if err != nil {
diff --git a/session.go b/session.go
index e9176d1..4f88b27 100644
--- a/session.go
+++ b/session.go
@@ -59,17 +59,12 @@ const (
 /////////////////////////////////////////
 
 var (
-       wheel *gxtime.Wheel
+       defaultTimerWheel *gxtime.TimerWheel
 )
 
 func init() {
-       span := 100e6 // 100ms
-       buckets := MaxWheelTimeSpan / span
-       wheel = gxtime.NewWheel(time.Duration(span), int(buckets)) // wheel 
longest span is 15 minute
-}
-
-func GetTimeWheel() *gxtime.Wheel {
-       return wheel
+       gxtime.InitDefaultTimerWheel()
+       defaultTimerWheel = gxtime.GetDefaultTimerWheel()
 }
 
 // getty base session
@@ -102,8 +97,6 @@ type session struct {
 
        // goroutines sync
        grNum int32
-       // read goroutines done signal
-       rDone chan struct{}
        lock  sync.RWMutex
 }
 
@@ -122,7 +115,6 @@ func newSession(endPoint EndPoint, conn Connection) 
*session {
                done:  make(chan struct{}),
                wait:  pendingDuration,
                attrs: gxcontext.NewValuesContext(context.Background()),
-               rDone: make(chan struct{}),
        }
 
        ss.Connection.setSession(ss)
@@ -164,7 +156,6 @@ func (s *session) Reset() {
                period: period,
                wait:   pendingDuration,
                attrs:  gxcontext.NewValuesContext(context.Background()),
-               rDone:  make(chan struct{}),
        }
 }
 
@@ -460,6 +451,34 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
        return nil
 }
 
+func sessionTimerLoop(_ gxtime.TimerID, _ time.Time, arg interface{}) error {
+       ss, _ := arg.(*session)
+       if ss != nil && ss.IsClosed() {
+               return ErrSessionClosed
+       }
+
+       f := func() {
+               wsConn, wsFlag := ss.Connection.(*gettyWSConn)
+               if wsFlag {
+                       err := wsConn.writePing()
+                       if err != nil {
+                               log.Warnf("wsConn.writePing() = error:%+v", 
perrors.WithStack(err))
+                       }
+               }
+
+               ss.listener.OnCron(ss)
+       }
+
+       // if enable task pool, run @f asynchronously.
+       if taskPool := ss.EndPoint().GetTaskPool(); taskPool != nil {
+               taskPool.AddTaskAlways(f)
+               return nil
+       }
+
+       f()
+       return nil
+}
+
 // func (s *session) RunEventLoop() {
 func (s *session) run() {
        if s.Connection == nil || s.listener == nil || s.writer == nil {
@@ -477,56 +496,12 @@ func (s *session) run() {
                return
        }
 
-       // start read/write gr
-       atomic.AddInt32(&(s.grNum), 2)
-       go s.handleLoop()
-       go s.handlePackage()
-}
-
-func (s *session) handleLoop() {
-       var (
-               wsFlag  bool
-               wsConn  *gettyWSConn
-               counter gxtime.CountWatch
-       )
-
-       defer func() {
-               if r := recover(); r != nil {
-                       const size = 64 << 10
-                       rBuf := make([]byte, size)
-                       rBuf = rBuf[:runtime.Stack(rBuf, false)]
-                       log.Errorf("[session.handleLoop] panic session %s: 
err=%s\n%s", s.sessionToken(), r, rBuf)
-               }
-
-               grNum := atomic.AddInt32(&(s.grNum), -1)
-               s.listener.OnClose(s)
-               log.Infof("%s, [session.handleLoop] goroutine exit now, left gr 
num %d", s.Stat(), grNum)
-               s.gc()
-       }()
-
-       wsConn, wsFlag = s.Connection.(*gettyWSConn)
-LOOP:
-       for {
-               select {
-               case <-s.done:
-                       // this case branch assure the (session)handleLoop gr 
will exit after (session)handlePackage gr.
-                       <-s.rDone
-                       counter.Start()
-                       if counter.Count() > s.wait.Nanoseconds() {
-                               log.Infof("%s, [session.handleLoop] got done 
signal ", s.Stat())
-                               break LOOP
-                       }
-
-               case <-wheel.After(s.period):
-                       if wsFlag {
-                               err := wsConn.writePing()
-                               if err != nil {
-                                       log.Warnf("wsConn.writePing() = 
error:%+v", perrors.WithStack(err))
-                               }
-                       }
-                       s.listener.OnCron(s)
-               }
+       atomic.AddInt32(&(s.grNum), 1)
+       if _, err := defaultTimerWheel.AddTimer(sessionTimerLoop, 
gxtime.TimerLoop, s.period, s); err != nil {
+               panic(fmt.Sprintf("failed to add session %s to 
defaultTimerWheel", s.Stat()))
        }
+       // start read gr
+       go s.handlePackage()
 }
 
 func (s *session) addTask(pkg interface{}) {
@@ -553,8 +528,6 @@ func (s *session) handlePackage() {
                        rBuf = rBuf[:runtime.Stack(rBuf, false)]
                        log.Errorf("[session.handlePackage] panic session %s: 
err=%s\n%s", s.sessionToken(), r, rBuf)
                }
-
-               close(s.rDone)
                grNum := atomic.AddInt32(&(s.grNum), -1)
                log.Infof("%s, [session.handlePackage] gr will exit now, left 
gr num %d", s.sessionToken(), grNum)
                s.stop()
@@ -564,6 +537,9 @@ func (s *session) handlePackage() {
                                s.listener.OnError(s, err)
                        }
                }
+
+               s.listener.OnClose(s)
+               s.gc()
        }()
 
        if _, ok := s.Connection.(*gettyTCPConn); ok {
@@ -702,8 +678,8 @@ func (s *session) handleUDPPackage() error {
        if int(s.maxMsgLen<<1) < bufLen {
                maxBufLen = int(s.maxMsgLen << 1)
        }
-       bufp = gxbytes.GetBytes(maxBufLen)
-       defer gxbytes.PutBytes(bufp)
+       bufp = gxbytes.AcquireBytes(maxBufLen)
+       defer gxbytes.ReleaseBytes(bufp)
        buf = *bufp
        for {
                if s.IsClosed() {

Reply via email to