This is an automated email from the ASF dual-hosted git repository.

laurence pushed a commit to branch config-enhance
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/config-enhance by this push:
     new 0697950  feat(*): add graceful shutdown (#1470)
0697950 is described below

commit 0697950e18505e3989d738fb2750d3dc67e60587
Author: Mulavar <[email protected]>
AuthorDate: Fri Sep 24 13:50:46 2021 +0800

    feat(*): add graceful shutdown (#1470)
    
    * feat(*): add graceful shutdown
    
    * feat(*): merge destroyProviderProtocols and destroyConsumerProtocols
    
    Co-authored-by: dongjianhui03 <[email protected]>
---
 common/constant/key.go      |  39 ++++----
 config/config_setter.go     |   4 -
 config/graceful_shutdown.go | 234 +++++++++++++++++++-------------------------
 filter/gshutdown/filter.go  |   4 +-
 4 files changed, 124 insertions(+), 157 deletions(-)

diff --git a/common/constant/key.go b/common/constant/key.go
index 80d2fe7..bf8b3e1 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -65,25 +65,26 @@ const (
 
 // Filter Keys
 const (
-       AccessLogFilterKey                = "accesslog"
-       ActiveFilterKey                   = "active"
-       AuthConsumerFilterKey             = "sign"
-       AuthProviderFilterKey             = "auth"
-       EchoFilterKey                     = "echo"
-       ExecuteLimitFilterKey             = "execute"
-       GenericFilterKey                  = "generic"
-       GenericServiceFilterKey           = "generic_service"
-       GracefulShutdownProviderFilterKey = "pshutdown"
-       GracefulShutdownConsumerFilterKey = "cshutdown"
-       HystrixConsumerFilterKey          = "hystrix_consumer"
-       HystrixProviderFilterKey          = "hystrix_provider"
-       MetricsFilterKey                  = "metrics"
-       SeataFilterKey                    = "seata"
-       SentinelProviderFilterKey         = "sentinel-provider"
-       SentinelConsumerFilterKey         = "sentinel-consumer"
-       TokenFilterKey                    = "token"
-       TpsLimitFilterKey                 = "tps"
-       TracingFilterKey                  = "tracing"
+       AccessLogFilterKey                   = "accesslog"
+       ActiveFilterKey                      = "active"
+       AuthConsumerFilterKey                = "sign"
+       AuthProviderFilterKey                = "auth"
+       EchoFilterKey                        = "echo"
+       ExecuteLimitFilterKey                = "execute"
+       GenericFilterKey                     = "generic"
+       GenericServiceFilterKey              = "generic_service"
+       GracefulShutdownProviderFilterKey    = "pshutdown"
+       GracefulShutdownConsumerFilterKey    = "cshutdown"
+       GracefulShutdownFilterShutdownConfig = 
"GracefulShutdownFilterShutdownConfig"
+       HystrixConsumerFilterKey             = "hystrix_consumer"
+       HystrixProviderFilterKey             = "hystrix_provider"
+       MetricsFilterKey                     = "metrics"
+       SeataFilterKey                       = "seata"
+       SentinelProviderFilterKey            = "sentinel-provider"
+       SentinelConsumerFilterKey            = "sentinel-consumer"
+       TokenFilterKey                       = "token"
+       TpsLimitFilterKey                    = "tps"
+       TracingFilterKey                     = "tracing"
 )
 
 const (
diff --git a/config/config_setter.go b/config/config_setter.go
index 9f114f2..7f409e3 100644
--- a/config/config_setter.go
+++ b/config/config_setter.go
@@ -17,10 +17,6 @@
 
 package config
 
-const (
-       GracefulShutdownFilterShutdownConfig = 
"GracefulShutdownFilterShutdownConfig"
-)
-
 type Setter interface {
        Set(name string, config interface{})
 }
diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go
index d37ad82..6c0b755 100644
--- a/config/graceful_shutdown.go
+++ b/config/graceful_shutdown.go
@@ -18,6 +18,9 @@
 package config
 
 import (
+       "os"
+       "os/signal"
+       "runtime/debug"
        "time"
 )
 
@@ -51,61 +54,49 @@ import (
 const defaultShutDownTime = time.Second * 60
 
 // nolint
-//func GracefulShutdownInit() {
-//     signals := make(chan os.Signal, 1)
-//
-//     signal.Notify(signals, ShutdownSignals...)
-//
-//     // retrieve ShutdownConfig for gracefulShutdownFilter
-//     if filter, ok := 
extension.GetFilter(constant.GracefulShutdownConsumerFilterKey).(config.Setter);
 ok && config.GetConsumerConfig().ShutdownConfig != nil {
-//             filter.Set(config.GracefulShutdownFilterShutdownConfig, 
config.GetConsumerConfig().ShutdownConfig)
-//     }
-//     if filter, ok := 
extension.GetFilter(constant.GracefulShutdownProviderFilterKey).(config.Setter);
 ok && config.GetProviderConfig().ShutdownConfig != nil {
-//             filter.Set(config.GracefulShutdownFilterShutdownConfig, 
config.GetProviderConfig().ShutdownConfig)
-//     }
-//
-//     go func() {
-//             select {
-//             case sig := <-signals:
-//                     logger.Infof("get signal %s, applicationConfig will 
shutdown.", sig)
-//                     // gracefulShutdownOnce.Do(func() {
-//                     time.AfterFunc(totalTimeout(), func() {
-//                             logger.Warn("Shutdown gracefully timeout, 
applicationConfig will shutdown immediately. ")
-//                             os.Exit(0)
-//                     })
-//                     BeforeShutdown()
-//                     // those signals' original behavior is exit with dump 
ths stack, so we try to keep the behavior
-//                     for _, dumpSignal := range DumpHeapShutdownSignals {
-//                             if sig == dumpSignal {
-//                                     debug.WriteHeapDump(os.Stdout.Fd())
-//                             }
-//                     }
-//                     os.Exit(0)
-//             }
-//     }()
-//}
+func GracefulShutdownInit() {
+       signals := make(chan os.Signal, 1)
+
+       signal.Notify(signals, ShutdownSignals...)
+
+       // retrieve ShutdownConfig for gracefulShutdownFilter
+       if filter, ok := 
extension.GetFilter(constant.GracefulShutdownConsumerFilterKey).(Setter); ok && 
rootConfig.Shutdown != nil {
+               filter.Set(constant.GracefulShutdownFilterShutdownConfig, 
rootConfig.Shutdown)
+       }
+
+       go func() {
+               select {
+               case sig := <-signals:
+                       logger.Infof("get signal %s, applicationConfig will 
shutdown.", sig)
+                       // gracefulShutdownOnce.Do(func() {
+                       time.AfterFunc(totalTimeout(), func() {
+                               logger.Warn("Shutdown gracefully timeout, 
applicationConfig will shutdown immediately. ")
+                               os.Exit(0)
+                       })
+                       BeforeShutdown()
+                       // those signals' original behavior is exit with dump 
ths stack, so we try to keep the behavior
+                       for _, dumpSignal := range DumpHeapShutdownSignals {
+                               if sig == dumpSignal {
+                                       debug.WriteHeapDump(os.Stdout.Fd())
+                               }
+                       }
+                       os.Exit(0)
+               }
+       }()
+}
 
 // BeforeShutdown provides processing flow before shutdown
 func BeforeShutdown() {
        destroyAllRegistries()
        // waiting for a short time so that the clients have enough time to get 
the notification that server shutdowns
        // The value of configuration depends on how long the clients will get 
notification.
-       //waitAndAcceptNewRequests()
-
-       // reject the new request, but keeping waiting for accepting requests
-       //waitForReceivingRequests()
-
-       // we fetch the protocols from Consumer.References. 
Consumer.ProtocolConfig doesn't contains all protocol, like jsonrpc
-       //consumerProtocols := getConsumerProtocols()
-
-       // If this applicationConfig is not the provider, it will do nothing
-       //destroyProviderProtocols(consumerProtocols)
+       waitAndAcceptNewRequests()
 
-       // reject sending the new request, and waiting for response of sending 
requests
-       //waitForSendingRequests()
+       // reject sending/receiving the new request, but keeping waiting for 
accepting requests
+       waitForSendingAndReceivingRequests()
 
-       // If this applicationConfig is not the consumer, it will do nothing
-       //destroyConsumerProtocols(consumerProtocols)
+       // destroy all protocols
+       destroyProtocols()
 
        logger.Info("Graceful shutdown --- Execute the custom callbacks.")
        customCallbacks := extension.GetAllCustomShutdownCallbacks()
@@ -120,68 +111,56 @@ func destroyAllRegistries() {
        registryProtocol.Destroy()
 }
 
-func destroyConsumerProtocols(consumerProtocols *gxset.HashSet) {
-       logger.Info("Graceful shutdown --- Destroy consumer's protocols. ")
+// destroyProtocols destroys protocols.
+// First we destroy provider's protocols, and then we destroy the consumer 
protocols.
+func destroyProtocols() {
+       logger.Info("Graceful shutdown --- Destroy protocols. ")
+       logger.Info("Graceful shutdown --- First destroy provider's protocols. 
")
+
+       consumerProtocols := getConsumerProtocols()
+       if rootConfig.Protocols == nil {
+               return
+       }
+
+       for _, protocol := range rootConfig.Protocols {
+               // the protocol is the consumer's protocol too, we can not 
destroy it.
+               if consumerProtocols.Contains(protocol.Name) {
+                       continue
+               }
+               extension.GetProtocol(protocol.Name).Destroy()
+       }
+
+       logger.Info("Graceful shutdown --- Second destroy consumer's protocols. 
")
        for name := range consumerProtocols.Items {
                extension.GetProtocol(name.(string)).Destroy()
        }
 }
 
-// destroyProviderProtocols destroys the provider's protocol.
-// if the protocol is consumer's protocol too, we will keep it
-//func destroyProviderProtocols(consumerProtocols *gxset.HashSet) {
-//     logger.Info("Graceful shutdown --- Destroy provider's protocols. ")
-//
-//     if config.providerConfig == nil || config.providerConfig.Protocols == 
nil {
-//             return
-//     }
-//
-//     for _, protocol := range config.providerConfig.Protocols {
-//
-//             // the protocol is the consumer's protocol too, we can not 
destroy it.
-//             if consumerProtocols.Contains(protocol.Name) {
-//                     continue
-//             }
-//             extension.GetProtocol(protocol.Name).Destroy()
-//     }
-//}
-
-//func waitAndAcceptNewRequests() {
-//     logger.Info("Graceful shutdown --- Keep waiting and accept new requests 
for a short time. ")
-//     if config.providerConfig == nil || config.providerConfig.ShutdownConfig 
== nil {
-//             return
-//     }
-//
-//     timeout := config.providerConfig.ShutdownConfig.GetStepTimeout()
-//
-//     // ignore this step
-//     if timeout < 0 {
-//             return
-//     }
-//     time.Sleep(timeout)
-//}
-
-// for provider. It will wait for processing receiving requests
-//func waitForReceivingRequests() {
-//     logger.Info("Graceful shutdown --- Keep waiting until accepting 
requests finish or timeout. ")
-//     if config.providerConfig == nil || config.providerConfig.ShutdownConfig 
== nil {
-//             // ignore this step
-//             return
-//     }
-//     config.providerConfig.ShutdownConfig.RejectRequest = true
-//     waitingProcessedTimeout(config.providerConfig.ShutdownConfig)
-//}
-
-// for consumer. It will wait for the response of sending requests
-//func waitForSendingRequests() {
-//     logger.Info("Graceful shutdown --- Keep waiting until sending requests 
getting response or timeout ")
-//     if config.consumerConfig == nil || config.consumerConfig.ShutdownConfig 
== nil {
-//             // ignore this step
-//             return
-//     }
-//     config.consumerConfig.ShutdownConfig.RejectRequest = true
-//     waitingProcessedTimeout(config.consumerConfig.ShutdownConfig)
-//}
+func waitAndAcceptNewRequests() {
+       logger.Info("Graceful shutdown --- Keep waiting and accept new requests 
for a short time. ")
+       if rootConfig.Shutdown == nil {
+               return
+       }
+
+       timeout := rootConfig.Shutdown.GetStepTimeout()
+
+       // ignore this step
+       if timeout < 0 {
+               return
+       }
+       time.Sleep(timeout)
+}
+
+//for provider. It will wait for processing receiving requests
+func waitForSendingAndReceivingRequests() {
+       logger.Info("Graceful shutdown --- Keep waiting until sending/accepting 
requests finish or timeout. ")
+       if rootConfig == nil || rootConfig.Shutdown == nil {
+               // ignore this step
+               return
+       }
+       rootConfig.Shutdown.RejectRequest = true
+       waitingProcessedTimeout(rootConfig.Shutdown)
+}
 
 func waitingProcessedTimeout(shutdownConfig *ShutdownConfig) {
        timeout := shutdownConfig.GetStepTimeout()
@@ -196,33 +175,24 @@ func waitingProcessedTimeout(shutdownConfig 
*ShutdownConfig) {
        }
 }
 
-//func totalTimeout() time.Duration {
-//     providerShutdown := defaultShutDownTime
-//     if config.providerConfig != nil && config.providerConfig.ShutdownConfig 
!= nil {
-//             providerShutdown = 
config.providerConfig.ShutdownConfig.GetTimeout()
-//     }
-//
-//     var consumerShutdown time.Duration
-//     if config.consumerConfig != nil && config.consumerConfig.ShutdownConfig 
!= nil {
-//             consumerShutdown = 
config.consumerConfig.ShutdownConfig.GetTimeout()
-//     }
-//
-//     timeout := providerShutdown
-//     if consumerShutdown > providerShutdown {
-//             timeout = consumerShutdown
-//     }
-//     return timeout
-//}
+func totalTimeout() time.Duration {
+       timeout := defaultShutDownTime
+       if rootConfig.Shutdown != nil && rootConfig.Shutdown.GetTimeout() > 
timeout {
+               timeout = rootConfig.Shutdown.GetTimeout()
+       }
+
+       return timeout
+}
 
 // we can not get the protocols from consumerConfig because some protocol 
don't have configuration, like jsonrpc.
-//func getConsumerProtocols() *gxset.HashSet {
-//     result := gxset.NewSet()
-//     if config.consumerConfig == nil || config.consumerConfig.References == 
nil {
-//             return result
-//     }
-//
-//     for _, reference := range config.consumerConfig.References {
-//             result.Add(reference.Protocol)
-//     }
-//     return result
-//}
+func getConsumerProtocols() *gxset.HashSet {
+       result := gxset.NewSet()
+       if rootConfig.Consumer == nil || rootConfig.Consumer.References == nil {
+               return result
+       }
+
+       for _, reference := range rootConfig.Consumer.References {
+               result.Add(reference.Protocol)
+       }
+       return result
+}
diff --git a/filter/gshutdown/filter.go b/filter/gshutdown/filter.go
index 1bbc197..7b4a905 100644
--- a/filter/gshutdown/filter.go
+++ b/filter/gshutdown/filter.go
@@ -68,12 +68,12 @@ func (f *Filter) OnResponse(ctx context.Context, result 
protocol.Result, invoker
 
 func (f *Filter) Set(name string, conf interface{}) {
        switch name {
-       case config.GracefulShutdownFilterShutdownConfig:
+       case constant.GracefulShutdownFilterShutdownConfig:
                if shutdownConfig, ok := conf.(*config.ShutdownConfig); !ok {
                        f.shutdownConfig = shutdownConfig
                        return
                }
-               logger.Warnf("the type of config for {%s} should be 
*config.ShutdownConfig", config.GracefulShutdownFilterShutdownConfig)
+               logger.Warnf("the type of config for {%s} should be 
*config.ShutdownConfig", constant.GracefulShutdownFilterShutdownConfig)
        default:
                // do nothing
        }

Reply via email to