This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/develop by this push:
new 119744bb Add Graceful Shutdown (#474)
119744bb is described below
commit 119744bbcb12aeaeef6cdebabd1e346a1c56f4bd
Author: Nanno <[email protected]>
AuthorDate: Sun Sep 4 14:00:20 2022 +0800
Add Graceful Shutdown (#474)
---
pixiu/pkg/common/shutdown/shutdown_signal.go | 37 ++++++++++++++++++++
pixiu/pkg/listener/http/http_listener.go | 17 +++++++--
pixiu/pkg/listener/http2/http2_listener.go | 46 +++++++++++++++++++-----
pixiu/pkg/listener/listener.go | 15 +++++++-
pixiu/pkg/listener/tcp/server_handler.go | 14 +++++++-
pixiu/pkg/listener/tcp/tcp_listener.go | 28 ++++++++++++---
pixiu/pkg/listener/triple/triple_listener.go | 34 +++++++++++++++---
pixiu/pkg/model/bootstrap.go | 36 +++++++++++++++++--
pixiu/pkg/model/http.go | 2 +-
pixiu/pkg/server/listener_manager.go | 52 +++++++++++++++++++++++++++-
10 files changed, 253 insertions(+), 28 deletions(-)
diff --git a/pixiu/pkg/common/shutdown/shutdown_signal.go
b/pixiu/pkg/common/shutdown/shutdown_signal.go
new file mode 100644
index 00000000..6aca490e
--- /dev/null
+++ b/pixiu/pkg/common/shutdown/shutdown_signal.go
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package shutdown
+
+import (
+ "os"
+ "syscall"
+)
+
+var (
+ // ShutdownSignals receives shutdown signals to process
+ ShutdownSignals = []os.Signal{
+ os.Interrupt, os.Kill, syscall.SIGKILL, syscall.SIGSTOP,
+ syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT,
syscall.SIGILL, syscall.SIGTRAP,
+ syscall.SIGABRT, syscall.SIGSYS, syscall.SIGTERM,
+ }
+
+ // DumpHeapShutdownSignals receives shutdown signals to process
+ DumpHeapShutdownSignals = []os.Signal{
+ syscall.SIGQUIT, syscall.SIGILL,
+ syscall.SIGTRAP, syscall.SIGABRT, syscall.SIGSYS,
+ }
+)
diff --git a/pixiu/pkg/listener/http/http_listener.go
b/pixiu/pkg/listener/http/http_listener.go
index 70eb27a3..c4c8442c 100644
--- a/pixiu/pkg/listener/http/http_listener.go
+++ b/pixiu/pkg/listener/http/http_listener.go
@@ -18,10 +18,12 @@
package http
import (
+ "context"
"fmt"
"log"
"net/http"
"strconv"
+ "sync"
"time"
)
@@ -32,6 +34,7 @@ import (
)
import (
+ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/filterchain"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/listener"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
@@ -84,9 +87,17 @@ func (ls *HttpListenerService) Close() error {
return ls.srv.Close()
}
-func (ls *HttpListenerService) ShutDown() error {
- //TODO implement me
- panic("implement me")
+func (ls *HttpListenerService) ShutDown(wg interface{}) error {
+ timeout := config.GetBootstrap().GetShutdownConfig().GetTimeout()
+ if timeout <= 0 {
+ return nil
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer func() {
+ cancel()
+ wg.(*sync.WaitGroup).Done()
+ }()
+ return ls.srv.Shutdown(ctx)
}
func (ls *HttpListenerService) Refresh(c model.Listener) error {
diff --git a/pixiu/pkg/listener/http2/http2_listener.go
b/pixiu/pkg/listener/http2/http2_listener.go
index ee281b5a..73a067de 100644
--- a/pixiu/pkg/listener/http2/http2_listener.go
+++ b/pixiu/pkg/listener/http2/http2_listener.go
@@ -21,6 +21,8 @@ import (
"net"
"net/http"
"strconv"
+ "sync"
+ "time"
)
import (
@@ -29,6 +31,7 @@ import (
)
import (
+ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/filterchain"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/listener"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
@@ -43,13 +46,15 @@ type (
// Http2ListenerService the facade of a listener
Http2ListenerService struct {
listener.BaseListenerService
- listener net.Listener
- server *http.Server
+ listener net.Listener
+ server *http.Server
+ gShutdownConfig *listener.ListenerGracefulShutdownConfig
}
)
type handleWrapper struct {
- fc *filterchain.NetworkFilterChain
+ fc *filterchain.NetworkFilterChain
+ gShutdownConfig *listener.ListenerGracefulShutdownConfig
}
type h2cWrapper struct {
@@ -64,6 +69,12 @@ func (h *h2cWrapper) ServeHTTP(w http.ResponseWriter, r
*http.Request) {
// ServeHTTP call FilterChain to handle http request and response.
func (h *handleWrapper) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ h.gShutdownConfig.AddActiveCount(1)
+ defer h.gShutdownConfig.AddActiveCount(-1)
+ if h.gShutdownConfig.RejectRequest {
+ http.Error(w, "Pixiu is preparing to close, reject all new
requests", http.StatusInternalServerError)
+ return
+ }
h.fc.ServeHTTP(w, r)
}
@@ -74,8 +85,9 @@ func newHttp2ListenerService(lc *model.Listener, bs
*model.Bootstrap) (listener.
Config: lc,
FilterChain: fc,
},
- listener: nil,
- server: nil,
+ listener: nil,
+ server: nil,
+ gShutdownConfig: &listener.ListenerGracefulShutdownConfig{},
}, nil
}
@@ -91,7 +103,10 @@ func (ls *Http2ListenerService) Start() error {
}
ls.listener = l
- handlerWrapper := &handleWrapper{ls.FilterChain}
+ handlerWrapper := &handleWrapper{
+ fc: ls.FilterChain,
+ gShutdownConfig: ls.gShutdownConfig,
+ }
h2s := &http2.Server{}
h := &h2cWrapper{
w: handlerWrapper,
@@ -119,9 +134,22 @@ func (ls *Http2ListenerService) Close() error {
return ls.server.Close()
}
-func (ls *Http2ListenerService) ShutDown() error {
- //TODO implement me
- panic("implement me")
+func (ls *Http2ListenerService) ShutDown(wg interface{}) error {
+ timeout := config.GetBootstrap().GetShutdownConfig().GetTimeout()
+ if timeout <= 0 {
+ return nil
+ }
+ // stop accept request
+ ls.gShutdownConfig.RejectRequest = true
+ deadline := time.Now().Add(timeout)
+ for time.Now().Before(deadline) && ls.gShutdownConfig.ActiveCount > 0 {
+ // sleep 100 ms and check it again
+ time.Sleep(100 * time.Millisecond)
+ logger.Infof("waiting for active invocation count = %d",
ls.gShutdownConfig.ActiveCount)
+ }
+ wg.(*sync.WaitGroup).Done()
+ ls.server.Close()
+ return nil
}
func (ls *Http2ListenerService) Refresh(c model.Listener) error {
diff --git a/pixiu/pkg/listener/listener.go b/pixiu/pkg/listener/listener.go
index 6649003a..01b78460 100644
--- a/pixiu/pkg/listener/listener.go
+++ b/pixiu/pkg/listener/listener.go
@@ -17,6 +17,10 @@
package listener
+import (
+ "sync/atomic"
+)
+
import (
"github.com/pkg/errors"
)
@@ -35,7 +39,7 @@ type (
// Close the listener service forcefully
Close() error
// ShutDown gracefully shuts down the listener.
- ShutDown() error
+ ShutDown(interface{}) error
// Refresh config
Refresh(model.Listener) error
}
@@ -44,6 +48,11 @@ type (
Config *model.Listener
FilterChain *filterchain.NetworkFilterChain
}
+
+ ListenerGracefulShutdownConfig struct {
+ ActiveCount int32
+ RejectRequest bool
+ }
)
// SetListenerServiceFactory will store the listenerService factory by name
@@ -62,3 +71,7 @@ func CreateListenerService(lc *model.Listener, bs
*model.Bootstrap) (ListenerSer
}
return nil, errors.New("Registry " + lc.ProtocolStr + " does not
support yet")
}
+
+func (lgsc *ListenerGracefulShutdownConfig) AddActiveCount(num int32) {
+ atomic.AddInt32(&lgsc.ActiveCount, num)
+}
diff --git a/pixiu/pkg/listener/tcp/server_handler.go
b/pixiu/pkg/listener/tcp/server_handler.go
index 08bb4ae7..12eafc52 100644
--- a/pixiu/pkg/listener/tcp/server_handler.go
+++ b/pixiu/pkg/listener/tcp/server_handler.go
@@ -122,9 +122,12 @@ func (h *ServerHandler) OnClose(session getty.Session) {
// OnMessage called when session receive new pkg
func (h *ServerHandler) OnMessage(session getty.Session, pkg interface{}) {
+ h.ls.gShutdownConfig.AddActiveCount(1)
+ defer h.ls.gShutdownConfig.AddActiveCount(-1)
+
h.rwlock.Lock()
if _, ok := h.sessionMap[session]; ok {
- h.sessionMap[session].reqNum++
+ h.sessionMap[session].AddReqNum(1)
}
h.rwlock.Unlock()
@@ -182,6 +185,15 @@ func (h *ServerHandler) OnMessage(session getty.Session,
pkg interface{}) {
}
}()
+ if h.ls.gShutdownConfig.RejectRequest {
+ err := perrors.Errorf("Pixiu is preparing to close, reject all
new requests")
+ resp.Result = protocol.RPCResult{
+ Err: err,
+ }
+ reply(session, resp)
+ return
+ }
+
invoc, ok := req.Data.(*invocation.RPCInvocation)
if !ok {
panic("create invocation occur some exception for the type is
not suitable one.")
diff --git a/pixiu/pkg/listener/tcp/tcp_listener.go
b/pixiu/pkg/listener/tcp/tcp_listener.go
index da5ad8d5..0fdb4d8f 100644
--- a/pixiu/pkg/listener/tcp/tcp_listener.go
+++ b/pixiu/pkg/listener/tcp/tcp_listener.go
@@ -21,6 +21,7 @@ package tcp
import (
"fmt"
"net"
+ "sync"
"time"
)
@@ -29,8 +30,10 @@ import (
)
import (
+ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/filterchain"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/listener"
+ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
)
@@ -42,7 +45,8 @@ type (
// ListenerService the facade of a listener
TcpListenerService struct {
listener.BaseListenerService
- server getty.Server
+ server getty.Server
+ gShutdownConfig *listener.ListenerGracefulShutdownConfig
}
)
@@ -57,7 +61,8 @@ func newTcpListenerService(lc *model.Listener, bs
*model.Bootstrap) (listener.Li
Config: lc,
FilterChain: fc,
},
- server: server,
+ server: server,
+ gShutdownConfig: &listener.ListenerGracefulShutdownConfig{},
}, nil
}
@@ -72,9 +77,22 @@ func (ls *TcpListenerService) Close() error {
return nil
}
-func (ls *TcpListenerService) ShutDown() error {
- //TODO implement me
- panic("implement me")
+func (ls *TcpListenerService) ShutDown(wg interface{}) error {
+ timeout := config.GetBootstrap().GetShutdownConfig().GetTimeout()
+ if timeout <= 0 {
+ return nil
+ }
+ // stop accept request
+ ls.gShutdownConfig.RejectRequest = true
+ deadline := time.Now().Add(timeout)
+ for time.Now().Before(deadline) && ls.gShutdownConfig.ActiveCount > 0 {
+ // sleep 100 ms and check it again
+ time.Sleep(100 * time.Millisecond)
+ logger.Infof("waiting for active invocation count = %d",
ls.gShutdownConfig.ActiveCount)
+ }
+ wg.(*sync.WaitGroup).Done()
+ ls.server.Close()
+ return nil
}
func (ls *TcpListenerService) Refresh(c model.Listener) error {
diff --git a/pixiu/pkg/listener/triple/triple_listener.go
b/pixiu/pkg/listener/triple/triple_listener.go
index db02be0f..18fd27ec 100644
--- a/pixiu/pkg/listener/triple/triple_listener.go
+++ b/pixiu/pkg/listener/triple/triple_listener.go
@@ -21,15 +21,19 @@ import (
"context"
"reflect"
"sync"
+ "time"
)
import (
tripleConstant "github.com/dubbogo/triple/pkg/common/constant"
triConfig "github.com/dubbogo/triple/pkg/config"
"github.com/dubbogo/triple/pkg/triple"
+
+ "github.com/pkg/errors"
)
import (
+ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/filterchain"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/listener"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
@@ -44,8 +48,9 @@ type (
// ListenerService the facade of a listener
TripleListenerService struct {
listener.BaseListenerService
- server *triple.TripleServer
- serviceMap *sync.Map
+ server *triple.TripleServer
+ serviceMap *sync.Map
+ gShutdownConfig *listener.ListenerGracefulShutdownConfig
}
// ProxyService grpc proxy service definition
ProxyService struct {
@@ -62,6 +67,7 @@ func newTripleListenerService(lc *model.Listener, bs
*model.Bootstrap) (listener
Config: lc,
FilterChain: fc,
},
+ gShutdownConfig: &listener.ListenerGracefulShutdownConfig{},
}
opts := []triConfig.OptionFunction{
@@ -93,9 +99,22 @@ func (ls *TripleListenerService) Close() error {
return nil
}
-func (ls *TripleListenerService) ShutDown() error {
- //TODO implement me
- panic("implement me")
+func (ls *TripleListenerService) ShutDown(wg interface{}) error {
+ timeout := config.GetBootstrap().GetShutdownConfig().GetTimeout()
+ if timeout <= 0 {
+ return nil
+ }
+ // stop accept request
+ ls.gShutdownConfig.RejectRequest = true
+ deadline := time.Now().Add(timeout)
+ for time.Now().Before(deadline) && ls.gShutdownConfig.ActiveCount > 0 {
+ // sleep 100 ms and check it again
+ time.Sleep(100 * time.Millisecond)
+ logger.Infof("waiting for active invocation count = %d",
ls.gShutdownConfig.ActiveCount)
+ }
+ wg.(*sync.WaitGroup).Done()
+ ls.server.Stop()
+ return nil
}
func (ls *TripleListenerService) Refresh(c model.Listener) error {
@@ -121,5 +140,10 @@ func (d *ProxyService) GetReqParamsInterfaces(methodName
string) ([]interface{},
// InvokeWithArgs called when rpc invocation comes
func (d *ProxyService) InvokeWithArgs(ctx context.Context, methodName string,
arguments []interface{}) (interface{}, error) {
+ d.ls.gShutdownConfig.AddActiveCount(1)
+ defer d.ls.gShutdownConfig.AddActiveCount(-1)
+ if d.ls.gShutdownConfig.RejectRequest {
+ return nil, errors.Errorf("Pixiu is preparing to close, reject
all new requests")
+ }
return d.ls.FilterChain.OnTripleData(ctx, methodName, arguments)
}
diff --git a/pixiu/pkg/model/bootstrap.go b/pixiu/pkg/model/bootstrap.go
index 084f3194..2b7fa5ee 100644
--- a/pixiu/pkg/model/bootstrap.go
+++ b/pixiu/pkg/model/bootstrap.go
@@ -17,6 +17,14 @@
package model
+import (
+ "time"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
+)
+
// Bootstrap the door
type Bootstrap struct {
StaticResources StaticResources `yaml:"static_resources"
json:"static_resources" mapstructure:"static_resources"`
@@ -42,6 +50,18 @@ func (bs *Bootstrap) GetStaticListeners() []*Listener {
return bs.StaticResources.Listeners
}
+// GetShutdownConfig
+func (bs *Bootstrap) GetShutdownConfig() *ShutdownConfig {
+ if bs.StaticResources.ShutdownConfig == nil {
+ bs.StaticResources.ShutdownConfig = &ShutdownConfig{
+ Timeout: "0s",
+ StepTimeout: "0s",
+ RejectPolicy: "immediacy",
+ }
+ }
+ return bs.StaticResources.ShutdownConfig
+}
+
// GetPprof
func (bs *Bootstrap) GetPprof() PprofConf {
return bs.StaticResources.PprofConf
@@ -83,11 +103,23 @@ type DynamicResources struct {
// ShutdownConfig how to shutdown server.
type ShutdownConfig struct {
- Timeout string `default:"60s" yaml:"timeout"
json:"timeout,omitempty"`
- StepTimeout string `default:"10s" yaml:"step_timeout"
json:"step_timeout,omitempty"`
+ Timeout string `default:"0s" yaml:"timeout"
json:"timeout,omitempty"`
+ StepTimeout string `default:"0s" yaml:"step_timeout"
json:"step_timeout,omitempty"`
RejectPolicy string `default:"immediacy" yaml:"reject_policy"
json:"reject_policy,omitempty"`
}
+// GetTimeoutOfShutdown
+func (sdc *ShutdownConfig) GetTimeout() time.Duration {
+ result, err := time.ParseDuration(sdc.Timeout)
+ if err != nil {
+ defaultTimeout := 60 * time.Second
+ logger.Errorf("The Timeout configuration is invalid: %s, and we
will use the default value: %s, err: %v",
+ sdc.Timeout, defaultTimeout.String(), err)
+ return defaultTimeout
+ }
+ return result
+}
+
// APIMetaConfig how to find api config, file or etcd etc.
type APIMetaConfig struct {
Address string `yaml:"address" json:"address,omitempty"`
diff --git a/pixiu/pkg/model/http.go b/pixiu/pkg/model/http.go
index daabaa4c..83c6a614 100644
--- a/pixiu/pkg/model/http.go
+++ b/pixiu/pkg/model/http.go
@@ -51,7 +51,7 @@ type HTTPFilter struct {
Config map[string]interface{} `yaml:"config" json:"config"
mapstructure:"config"`
}
-// HTTPFilter http filter
+// DubboFilter dubbo filter
type DubboFilter struct {
Name string `yaml:"name" json:"name"
mapstructure:"name"`
Config map[string]interface{} `yaml:"config" json:"config"
mapstructure:"config"`
diff --git a/pixiu/pkg/server/listener_manager.go
b/pixiu/pkg/server/listener_manager.go
index b42d33c9..aac881d5 100644
--- a/pixiu/pkg/server/listener_manager.go
+++ b/pixiu/pkg/server/listener_manager.go
@@ -18,9 +18,12 @@
package server
import (
+ "os"
+ "os/signal"
"runtime/debug"
"strconv"
"sync"
+ "time"
)
import (
@@ -30,6 +33,7 @@ import (
)
import (
+ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/shutdown"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/listener"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
@@ -50,6 +54,8 @@ type ListenerManager struct {
activeListenerService map[string]*wrapListenerService
//readWriteLock
rwLock *sync.RWMutex
+ //shutdownWaitGroup
+ shutdownWG *sync.WaitGroup
}
// CreateDefaultListenerManager create listener manager from config
@@ -68,11 +74,55 @@ func CreateDefaultListenerManager(bs *model.Bootstrap)
*ListenerManager {
}
}
- return &ListenerManager{
+ lm := &ListenerManager{
activeListenerService: listeners,
bootstrap: bs,
rwLock: &sync.RWMutex{},
+ shutdownWG: &sync.WaitGroup{},
}
+ lm.gracefulShutdownInit()
+
+ return lm
+}
+
+func (lm *ListenerManager) gracefulShutdownInit() {
+ sdc := lm.bootstrap.GetShutdownConfig()
+ timeout := sdc.GetTimeout()
+ if timeout <= 0 {
+ return
+ }
+ signals := make(chan os.Signal, 1)
+ signal.Notify(signals, shutdown.ShutdownSignals...)
+
+ go func() {
+ sig := <-signals
+ logger.Infof("get signal %s, dubbo-go-pixiu will start
shutdown.", sig)
+
+ time.AfterFunc(timeout, func() {
+ logger.Warn("Shutdown gracefully timeout, listeners
will shutdown immediately. ")
+ os.Exit(0)
+ })
+
+ for _, listener := range lm.activeListenerService {
+ lm.shutdownWG.Add(1)
+ go func(listener *wrapListenerService) {
+ err :=
listener.ListenerService.ShutDown(lm.shutdownWG)
+ if err != nil {
+ logger.Errorf("Shutdown Error: %+v",
err)
+ os.Exit(0)
+ }
+ }(listener)
+ }
+ lm.shutdownWG.Wait()
+
+ // those signals' original behavior is exit with dump ths
stack, so we try to keep the behavior
+ for _, dumpSignal := range shutdown.DumpHeapShutdownSignals {
+ if sig == dumpSignal {
+ debug.WriteHeapDump(os.Stdout.Fd())
+ }
+ }
+ os.Exit(0)
+ }()
}
func resolveListenerName(c *model.Listener) string {