This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git


The following commit(s) were added to refs/heads/develop by this push:
     new 119744bb Add Graceful Shutdown (#474)
119744bb is described below

commit 119744bbcb12aeaeef6cdebabd1e346a1c56f4bd
Author: Nanno <[email protected]>
AuthorDate: Sun Sep 4 14:00:20 2022 +0800

    Add Graceful Shutdown (#474)
---
 pixiu/pkg/common/shutdown/shutdown_signal.go | 37 ++++++++++++++++++++
 pixiu/pkg/listener/http/http_listener.go     | 17 +++++++--
 pixiu/pkg/listener/http2/http2_listener.go   | 46 +++++++++++++++++++-----
 pixiu/pkg/listener/listener.go               | 15 +++++++-
 pixiu/pkg/listener/tcp/server_handler.go     | 14 +++++++-
 pixiu/pkg/listener/tcp/tcp_listener.go       | 28 ++++++++++++---
 pixiu/pkg/listener/triple/triple_listener.go | 34 +++++++++++++++---
 pixiu/pkg/model/bootstrap.go                 | 36 +++++++++++++++++--
 pixiu/pkg/model/http.go                      |  2 +-
 pixiu/pkg/server/listener_manager.go         | 52 +++++++++++++++++++++++++++-
 10 files changed, 253 insertions(+), 28 deletions(-)

diff --git a/pixiu/pkg/common/shutdown/shutdown_signal.go 
b/pixiu/pkg/common/shutdown/shutdown_signal.go
new file mode 100644
index 00000000..6aca490e
--- /dev/null
+++ b/pixiu/pkg/common/shutdown/shutdown_signal.go
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package shutdown
+
+import (
+       "os"
+       "syscall"
+)
+
+var (
+       // ShutdownSignals receives shutdown signals to process
+       ShutdownSignals = []os.Signal{
+               os.Interrupt, os.Kill, syscall.SIGKILL, syscall.SIGSTOP,
+               syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, 
syscall.SIGILL, syscall.SIGTRAP,
+               syscall.SIGABRT, syscall.SIGSYS, syscall.SIGTERM,
+       }
+
+       // DumpHeapShutdownSignals receives shutdown signals to process
+       DumpHeapShutdownSignals = []os.Signal{
+               syscall.SIGQUIT, syscall.SIGILL,
+               syscall.SIGTRAP, syscall.SIGABRT, syscall.SIGSYS,
+       }
+)
diff --git a/pixiu/pkg/listener/http/http_listener.go 
b/pixiu/pkg/listener/http/http_listener.go
index 70eb27a3..c4c8442c 100644
--- a/pixiu/pkg/listener/http/http_listener.go
+++ b/pixiu/pkg/listener/http/http_listener.go
@@ -18,10 +18,12 @@
 package http
 
 import (
+       "context"
        "fmt"
        "log"
        "net/http"
        "strconv"
+       "sync"
        "time"
 )
 
@@ -32,6 +34,7 @@ import (
 )
 
 import (
+       "github.com/apache/dubbo-go-pixiu/pixiu/pkg/config"
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/filterchain"
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/listener"
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
@@ -84,9 +87,17 @@ func (ls *HttpListenerService) Close() error {
        return ls.srv.Close()
 }
 
-func (ls *HttpListenerService) ShutDown() error {
-       //TODO implement me
-       panic("implement me")
+func (ls *HttpListenerService) ShutDown(wg interface{}) error {
+       timeout := config.GetBootstrap().GetShutdownConfig().GetTimeout()
+       if timeout <= 0 {
+               return nil
+       }
+       ctx, cancel := context.WithTimeout(context.Background(), timeout)
+       defer func() {
+               cancel()
+               wg.(*sync.WaitGroup).Done()
+       }()
+       return ls.srv.Shutdown(ctx)
 }
 
 func (ls *HttpListenerService) Refresh(c model.Listener) error {
diff --git a/pixiu/pkg/listener/http2/http2_listener.go 
b/pixiu/pkg/listener/http2/http2_listener.go
index ee281b5a..73a067de 100644
--- a/pixiu/pkg/listener/http2/http2_listener.go
+++ b/pixiu/pkg/listener/http2/http2_listener.go
@@ -21,6 +21,8 @@ import (
        "net"
        "net/http"
        "strconv"
+       "sync"
+       "time"
 )
 
 import (
@@ -29,6 +31,7 @@ import (
 )
 
 import (
+       "github.com/apache/dubbo-go-pixiu/pixiu/pkg/config"
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/filterchain"
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/listener"
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
@@ -43,13 +46,15 @@ type (
        // Http2ListenerService the facade of a listener
        Http2ListenerService struct {
                listener.BaseListenerService
-               listener net.Listener
-               server   *http.Server
+               listener        net.Listener
+               server          *http.Server
+               gShutdownConfig *listener.ListenerGracefulShutdownConfig
        }
 )
 
 type handleWrapper struct {
-       fc *filterchain.NetworkFilterChain
+       fc              *filterchain.NetworkFilterChain
+       gShutdownConfig *listener.ListenerGracefulShutdownConfig
 }
 
 type h2cWrapper struct {
@@ -64,6 +69,12 @@ func (h *h2cWrapper) ServeHTTP(w http.ResponseWriter, r 
*http.Request) {
 
 // ServeHTTP call FilterChain to handle http request and response.
 func (h *handleWrapper) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+       h.gShutdownConfig.AddActiveCount(1)
+       defer h.gShutdownConfig.AddActiveCount(-1)
+       if h.gShutdownConfig.RejectRequest {
+               http.Error(w, "Pixiu is preparing to close, reject all new 
requests", http.StatusInternalServerError)
+               return
+       }
        h.fc.ServeHTTP(w, r)
 }
 
@@ -74,8 +85,9 @@ func newHttp2ListenerService(lc *model.Listener, bs 
*model.Bootstrap) (listener.
                        Config:      lc,
                        FilterChain: fc,
                },
-               listener: nil,
-               server:   nil,
+               listener:        nil,
+               server:          nil,
+               gShutdownConfig: &listener.ListenerGracefulShutdownConfig{},
        }, nil
 }
 
@@ -91,7 +103,10 @@ func (ls *Http2ListenerService) Start() error {
        }
        ls.listener = l
 
-       handlerWrapper := &handleWrapper{ls.FilterChain}
+       handlerWrapper := &handleWrapper{
+               fc:              ls.FilterChain,
+               gShutdownConfig: ls.gShutdownConfig,
+       }
        h2s := &http2.Server{}
        h := &h2cWrapper{
                w: handlerWrapper,
@@ -119,9 +134,22 @@ func (ls *Http2ListenerService) Close() error {
        return ls.server.Close()
 }
 
-func (ls *Http2ListenerService) ShutDown() error {
-       //TODO implement me
-       panic("implement me")
+func (ls *Http2ListenerService) ShutDown(wg interface{}) error {
+       timeout := config.GetBootstrap().GetShutdownConfig().GetTimeout()
+       if timeout <= 0 {
+               return nil
+       }
+       // stop accept request
+       ls.gShutdownConfig.RejectRequest = true
+       deadline := time.Now().Add(timeout)
+       for time.Now().Before(deadline) && ls.gShutdownConfig.ActiveCount > 0 {
+               // sleep 100 ms and check it again
+               time.Sleep(100 * time.Millisecond)
+               logger.Infof("waiting for active invocation count = %d", 
ls.gShutdownConfig.ActiveCount)
+       }
+       wg.(*sync.WaitGroup).Done()
+       ls.server.Close()
+       return nil
 }
 
 func (ls *Http2ListenerService) Refresh(c model.Listener) error {
diff --git a/pixiu/pkg/listener/listener.go b/pixiu/pkg/listener/listener.go
index 6649003a..01b78460 100644
--- a/pixiu/pkg/listener/listener.go
+++ b/pixiu/pkg/listener/listener.go
@@ -17,6 +17,10 @@
 
 package listener
 
+import (
+       "sync/atomic"
+)
+
 import (
        "github.com/pkg/errors"
 )
@@ -35,7 +39,7 @@ type (
                // Close the listener service forcefully
                Close() error
                // ShutDown gracefully shuts down the listener.
-               ShutDown() error
+               ShutDown(interface{}) error
                // Refresh config
                Refresh(model.Listener) error
        }
@@ -44,6 +48,11 @@ type (
                Config      *model.Listener
                FilterChain *filterchain.NetworkFilterChain
        }
+
+       ListenerGracefulShutdownConfig struct {
+               ActiveCount   int32
+               RejectRequest bool
+       }
 )
 
 // SetListenerServiceFactory will store the listenerService factory by name
@@ -62,3 +71,7 @@ func CreateListenerService(lc *model.Listener, bs 
*model.Bootstrap) (ListenerSer
        }
        return nil, errors.New("Registry " + lc.ProtocolStr + " does not 
support yet")
 }
+
+func (lgsc *ListenerGracefulShutdownConfig) AddActiveCount(num int32) {
+       atomic.AddInt32(&lgsc.ActiveCount, num)
+}
diff --git a/pixiu/pkg/listener/tcp/server_handler.go 
b/pixiu/pkg/listener/tcp/server_handler.go
index 08bb4ae7..12eafc52 100644
--- a/pixiu/pkg/listener/tcp/server_handler.go
+++ b/pixiu/pkg/listener/tcp/server_handler.go
@@ -122,9 +122,12 @@ func (h *ServerHandler) OnClose(session getty.Session) {
 
 // OnMessage called when session receive new pkg
 func (h *ServerHandler) OnMessage(session getty.Session, pkg interface{}) {
+       h.ls.gShutdownConfig.AddActiveCount(1)
+       defer h.ls.gShutdownConfig.AddActiveCount(-1)
+
        h.rwlock.Lock()
        if _, ok := h.sessionMap[session]; ok {
-               h.sessionMap[session].reqNum++
+               h.sessionMap[session].AddReqNum(1)
        }
        h.rwlock.Unlock()
 
@@ -182,6 +185,15 @@ func (h *ServerHandler) OnMessage(session getty.Session, 
pkg interface{}) {
                }
        }()
 
+       if h.ls.gShutdownConfig.RejectRequest {
+               err := perrors.Errorf("Pixiu is preparing to close, reject all 
new requests")
+               resp.Result = protocol.RPCResult{
+                       Err: err,
+               }
+               reply(session, resp)
+               return
+       }
+
        invoc, ok := req.Data.(*invocation.RPCInvocation)
        if !ok {
                panic("create invocation occur some exception for the type is 
not suitable one.")
diff --git a/pixiu/pkg/listener/tcp/tcp_listener.go 
b/pixiu/pkg/listener/tcp/tcp_listener.go
index da5ad8d5..0fdb4d8f 100644
--- a/pixiu/pkg/listener/tcp/tcp_listener.go
+++ b/pixiu/pkg/listener/tcp/tcp_listener.go
@@ -21,6 +21,7 @@ package tcp
 import (
        "fmt"
        "net"
+       "sync"
        "time"
 )
 
@@ -29,8 +30,10 @@ import (
 )
 
 import (
+       "github.com/apache/dubbo-go-pixiu/pixiu/pkg/config"
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/filterchain"
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/listener"
+       "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
 )
 
@@ -42,7 +45,8 @@ type (
        // ListenerService the facade of a listener
        TcpListenerService struct {
                listener.BaseListenerService
-               server getty.Server
+               server          getty.Server
+               gShutdownConfig *listener.ListenerGracefulShutdownConfig
        }
 )
 
@@ -57,7 +61,8 @@ func newTcpListenerService(lc *model.Listener, bs 
*model.Bootstrap) (listener.Li
                        Config:      lc,
                        FilterChain: fc,
                },
-               server: server,
+               server:          server,
+               gShutdownConfig: &listener.ListenerGracefulShutdownConfig{},
        }, nil
 }
 
@@ -72,9 +77,22 @@ func (ls *TcpListenerService) Close() error {
        return nil
 }
 
-func (ls *TcpListenerService) ShutDown() error {
-       //TODO implement me
-       panic("implement me")
+func (ls *TcpListenerService) ShutDown(wg interface{}) error {
+       timeout := config.GetBootstrap().GetShutdownConfig().GetTimeout()
+       if timeout <= 0 {
+               return nil
+       }
+       // stop accept request
+       ls.gShutdownConfig.RejectRequest = true
+       deadline := time.Now().Add(timeout)
+       for time.Now().Before(deadline) && ls.gShutdownConfig.ActiveCount > 0 {
+               // sleep 100 ms and check it again
+               time.Sleep(100 * time.Millisecond)
+               logger.Infof("waiting for active invocation count = %d", 
ls.gShutdownConfig.ActiveCount)
+       }
+       wg.(*sync.WaitGroup).Done()
+       ls.server.Close()
+       return nil
 }
 
 func (ls *TcpListenerService) Refresh(c model.Listener) error {
diff --git a/pixiu/pkg/listener/triple/triple_listener.go 
b/pixiu/pkg/listener/triple/triple_listener.go
index db02be0f..18fd27ec 100644
--- a/pixiu/pkg/listener/triple/triple_listener.go
+++ b/pixiu/pkg/listener/triple/triple_listener.go
@@ -21,15 +21,19 @@ import (
        "context"
        "reflect"
        "sync"
+       "time"
 )
 
 import (
        tripleConstant "github.com/dubbogo/triple/pkg/common/constant"
        triConfig "github.com/dubbogo/triple/pkg/config"
        "github.com/dubbogo/triple/pkg/triple"
+
+       "github.com/pkg/errors"
 )
 
 import (
+       "github.com/apache/dubbo-go-pixiu/pixiu/pkg/config"
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/filterchain"
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/listener"
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
@@ -44,8 +48,9 @@ type (
        // ListenerService the facade of a listener
        TripleListenerService struct {
                listener.BaseListenerService
-               server     *triple.TripleServer
-               serviceMap *sync.Map
+               server          *triple.TripleServer
+               serviceMap      *sync.Map
+               gShutdownConfig *listener.ListenerGracefulShutdownConfig
        }
        // ProxyService grpc proxy service definition
        ProxyService struct {
@@ -62,6 +67,7 @@ func newTripleListenerService(lc *model.Listener, bs 
*model.Bootstrap) (listener
                        Config:      lc,
                        FilterChain: fc,
                },
+               gShutdownConfig: &listener.ListenerGracefulShutdownConfig{},
        }
 
        opts := []triConfig.OptionFunction{
@@ -93,9 +99,22 @@ func (ls *TripleListenerService) Close() error {
        return nil
 }
 
-func (ls *TripleListenerService) ShutDown() error {
-       //TODO implement me
-       panic("implement me")
+func (ls *TripleListenerService) ShutDown(wg interface{}) error {
+       timeout := config.GetBootstrap().GetShutdownConfig().GetTimeout()
+       if timeout <= 0 {
+               return nil
+       }
+       // stop accept request
+       ls.gShutdownConfig.RejectRequest = true
+       deadline := time.Now().Add(timeout)
+       for time.Now().Before(deadline) && ls.gShutdownConfig.ActiveCount > 0 {
+               // sleep 100 ms and check it again
+               time.Sleep(100 * time.Millisecond)
+               logger.Infof("waiting for active invocation count = %d", 
ls.gShutdownConfig.ActiveCount)
+       }
+       wg.(*sync.WaitGroup).Done()
+       ls.server.Stop()
+       return nil
 }
 
 func (ls *TripleListenerService) Refresh(c model.Listener) error {
@@ -121,5 +140,10 @@ func (d *ProxyService) GetReqParamsInterfaces(methodName 
string) ([]interface{},
 
 // InvokeWithArgs called when rpc invocation comes
 func (d *ProxyService) InvokeWithArgs(ctx context.Context, methodName string, 
arguments []interface{}) (interface{}, error) {
+       d.ls.gShutdownConfig.AddActiveCount(1)
+       defer d.ls.gShutdownConfig.AddActiveCount(-1)
+       if d.ls.gShutdownConfig.RejectRequest {
+               return nil, errors.Errorf("Pixiu is preparing to close, reject 
all new requests")
+       }
        return d.ls.FilterChain.OnTripleData(ctx, methodName, arguments)
 }
diff --git a/pixiu/pkg/model/bootstrap.go b/pixiu/pkg/model/bootstrap.go
index 084f3194..2b7fa5ee 100644
--- a/pixiu/pkg/model/bootstrap.go
+++ b/pixiu/pkg/model/bootstrap.go
@@ -17,6 +17,14 @@
 
 package model
 
+import (
+       "time"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
+)
+
 // Bootstrap the door
 type Bootstrap struct {
        StaticResources  StaticResources   `yaml:"static_resources" 
json:"static_resources" mapstructure:"static_resources"`
@@ -42,6 +50,18 @@ func (bs *Bootstrap) GetStaticListeners() []*Listener {
        return bs.StaticResources.Listeners
 }
 
+// GetShutdownConfig
+func (bs *Bootstrap) GetShutdownConfig() *ShutdownConfig {
+       if bs.StaticResources.ShutdownConfig == nil {
+               bs.StaticResources.ShutdownConfig = &ShutdownConfig{
+                       Timeout:      "0s",
+                       StepTimeout:  "0s",
+                       RejectPolicy: "immediacy",
+               }
+       }
+       return bs.StaticResources.ShutdownConfig
+}
+
 // GetPprof
 func (bs *Bootstrap) GetPprof() PprofConf {
        return bs.StaticResources.PprofConf
@@ -83,11 +103,23 @@ type DynamicResources struct {
 
 // ShutdownConfig how to shutdown server.
 type ShutdownConfig struct {
-       Timeout      string `default:"60s" yaml:"timeout" 
json:"timeout,omitempty"`
-       StepTimeout  string `default:"10s" yaml:"step_timeout" 
json:"step_timeout,omitempty"`
+       Timeout      string `default:"0s" yaml:"timeout" 
json:"timeout,omitempty"`
+       StepTimeout  string `default:"0s" yaml:"step_timeout" 
json:"step_timeout,omitempty"`
        RejectPolicy string `default:"immediacy" yaml:"reject_policy" 
json:"reject_policy,omitempty"`
 }
 
+// GetTimeoutOfShutdown
+func (sdc *ShutdownConfig) GetTimeout() time.Duration {
+       result, err := time.ParseDuration(sdc.Timeout)
+       if err != nil {
+               defaultTimeout := 60 * time.Second
+               logger.Errorf("The Timeout configuration is invalid: %s, and we 
will use the default value: %s, err: %v",
+                       sdc.Timeout, defaultTimeout.String(), err)
+               return defaultTimeout
+       }
+       return result
+}
+
 // APIMetaConfig how to find api config, file or etcd etc.
 type APIMetaConfig struct {
        Address       string `yaml:"address" json:"address,omitempty"`
diff --git a/pixiu/pkg/model/http.go b/pixiu/pkg/model/http.go
index daabaa4c..83c6a614 100644
--- a/pixiu/pkg/model/http.go
+++ b/pixiu/pkg/model/http.go
@@ -51,7 +51,7 @@ type HTTPFilter struct {
        Config map[string]interface{} `yaml:"config" json:"config" 
mapstructure:"config"`
 }
 
-// HTTPFilter http filter
+// DubboFilter dubbo filter
 type DubboFilter struct {
        Name   string                 `yaml:"name" json:"name" 
mapstructure:"name"`
        Config map[string]interface{} `yaml:"config" json:"config" 
mapstructure:"config"`
diff --git a/pixiu/pkg/server/listener_manager.go 
b/pixiu/pkg/server/listener_manager.go
index b42d33c9..aac881d5 100644
--- a/pixiu/pkg/server/listener_manager.go
+++ b/pixiu/pkg/server/listener_manager.go
@@ -18,9 +18,12 @@
 package server
 
 import (
+       "os"
+       "os/signal"
        "runtime/debug"
        "strconv"
        "sync"
+       "time"
 )
 
 import (
@@ -30,6 +33,7 @@ import (
 )
 
 import (
+       "github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/shutdown"
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/listener"
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
@@ -50,6 +54,8 @@ type ListenerManager struct {
        activeListenerService map[string]*wrapListenerService
        //readWriteLock
        rwLock *sync.RWMutex
+       //shutdownWaitGroup
+       shutdownWG *sync.WaitGroup
 }
 
 // CreateDefaultListenerManager create listener manager from config
@@ -68,11 +74,55 @@ func CreateDefaultListenerManager(bs *model.Bootstrap) 
*ListenerManager {
                }
        }
 
-       return &ListenerManager{
+       lm := &ListenerManager{
                activeListenerService: listeners,
                bootstrap:             bs,
                rwLock:                &sync.RWMutex{},
+               shutdownWG:            &sync.WaitGroup{},
        }
+       lm.gracefulShutdownInit()
+
+       return lm
+}
+
+func (lm *ListenerManager) gracefulShutdownInit() {
+       sdc := lm.bootstrap.GetShutdownConfig()
+       timeout := sdc.GetTimeout()
+       if timeout <= 0 {
+               return
+       }
+       signals := make(chan os.Signal, 1)
+       signal.Notify(signals, shutdown.ShutdownSignals...)
+
+       go func() {
+               sig := <-signals
+               logger.Infof("get signal %s, dubbo-go-pixiu will start 
shutdown.", sig)
+
+               time.AfterFunc(timeout, func() {
+                       logger.Warn("Shutdown gracefully timeout, listeners 
will shutdown immediately. ")
+                       os.Exit(0)
+               })
+
+               for _, listener := range lm.activeListenerService {
+                       lm.shutdownWG.Add(1)
+                       go func(listener *wrapListenerService) {
+                               err := 
listener.ListenerService.ShutDown(lm.shutdownWG)
+                               if err != nil {
+                                       logger.Errorf("Shutdown Error: %+v", 
err)
+                                       os.Exit(0)
+                               }
+                       }(listener)
+               }
+               lm.shutdownWG.Wait()
+
+               // those signals' original behavior is exit with dump ths 
stack, so we try to keep the behavior
+               for _, dumpSignal := range shutdown.DumpHeapShutdownSignals {
+                       if sig == dumpSignal {
+                               debug.WriteHeapDump(os.Stdout.Fd())
+                       }
+               }
+               os.Exit(0)
+       }()
 }
 
 func resolveListenerName(c *model.Listener) string {

Reply via email to