This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/develop by this push:
new 7ba9017a5 feat: replace filter/graceful_shutdown's use of config
package to global package (#2906)
7ba9017a5 is described below
commit 7ba9017a5f0b9c433b9dffeebec29aa893a4ee4c
Author: Xuetao Li <[email protected]>
AuthorDate: Sat Jun 14 17:34:22 2025 +0800
feat: replace filter/graceful_shutdown's use of config package to global
package (#2906)
* fix: refactor graceful shutdown configuration to use global package
* merge to develop
* compat with old config
* update
* fix test
* update ci
* add test case for compact
* fix
* delete global variable
* use newOpts.Shutdown directly
---
client/options.go | 2 +-
config/graceful_shutdown.go | 12 ++--
.../graceful_shutdown}/compat.go | 5 +-
filter/graceful_shutdown/consumer_filter.go | 16 +++--
filter/graceful_shutdown/consumer_filter_test.go | 42 +++++++++---
filter/graceful_shutdown/provider_filter.go | 18 +++--
filter/graceful_shutdown/provider_filter_test.go | 60 +++++++++++++----
graceful_shutdown/options.go | 2 +-
graceful_shutdown/shutdown.go | 76 +++++++++++-----------
server/options.go | 2 +-
10 files changed, 151 insertions(+), 84 deletions(-)
diff --git a/client/options.go b/client/options.go
index f17e49dc4..4a70b7d11 100644
--- a/client/options.go
+++ b/client/options.go
@@ -545,7 +545,7 @@ func (cliOpts *ClientOptions) init(opts ...ClientOption)
error {
// todo(DMwangnima): is there any part that we should do compatibility
processing?
// init graceful_shutdown
-
graceful_shutdown.Init(graceful_shutdown.SetShutdown_Config(cliOpts.Shutdown))
+
graceful_shutdown.Init(graceful_shutdown.SetShutdownConfig(cliOpts.Shutdown))
return nil
}
diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go
index e39b2b7e7..3baa0d0cc 100644
--- a/config/graceful_shutdown.go
+++ b/config/graceful_shutdown.go
@@ -55,19 +55,19 @@ const defaultShutDownTime = time.Second * 60
func gracefulShutdownInit() {
// retrieve ShutdownConfig for gracefulShutdownFilter
- cGracefulShutdownFilter, existcGracefulShutdownFilter :=
extension.GetFilter(constant.GracefulShutdownConsumerFilterKey)
- if !existcGracefulShutdownFilter {
+ gracefulShutdownConsumerFilter, exist :=
extension.GetFilter(constant.GracefulShutdownConsumerFilterKey)
+ if !exist {
return
}
- sGracefulShutdownFilter, existsGracefulShutdownFilter :=
extension.GetFilter(constant.GracefulShutdownProviderFilterKey)
- if !existsGracefulShutdownFilter {
+ gracefulShutdownProviderFilter, exist :=
extension.GetFilter(constant.GracefulShutdownProviderFilterKey)
+ if !exist {
return
}
- if filter, ok := cGracefulShutdownFilter.(Setter); ok &&
rootConfig.Shutdown != nil {
+ if filter, ok := gracefulShutdownConsumerFilter.(Setter); ok &&
rootConfig.Shutdown != nil {
filter.Set(constant.GracefulShutdownFilterShutdownConfig,
GetShutDown())
}
- if filter, ok := sGracefulShutdownFilter.(Setter); ok &&
rootConfig.Shutdown != nil {
+ if filter, ok := gracefulShutdownProviderFilter.(Setter); ok &&
rootConfig.Shutdown != nil {
filter.Set(constant.GracefulShutdownFilterShutdownConfig,
GetShutDown())
}
diff --git a/graceful_shutdown/compat.go b/filter/graceful_shutdown/compat.go
similarity index 92%
rename from graceful_shutdown/compat.go
rename to filter/graceful_shutdown/compat.go
index f8b80bbd8..ef9dc16e0 100644
--- a/graceful_shutdown/compat.go
+++ b/filter/graceful_shutdown/compat.go
@@ -26,11 +26,11 @@ import (
"dubbo.apache.org/dubbo-go/v3/global"
)
-func compatShutdownConfig(c *global.ShutdownConfig) *config.ShutdownConfig {
+func compatGlobalShutdownConfig(c *config.ShutdownConfig)
*global.ShutdownConfig {
if c == nil {
return nil
}
- cfg := &config.ShutdownConfig{
+ cfg := &global.ShutdownConfig{
Timeout: c.Timeout,
StepTimeout: c.StepTimeout,
ConsumerUpdateWaitTime: c.ConsumerUpdateWaitTime,
@@ -40,5 +40,6 @@ func compatShutdownConfig(c *global.ShutdownConfig)
*config.ShutdownConfig {
RejectRequest: atomic.Bool{},
}
cfg.RejectRequest.Store(c.RejectRequest.Load())
+
return cfg
}
diff --git a/filter/graceful_shutdown/consumer_filter.go
b/filter/graceful_shutdown/consumer_filter.go
index 86f0d2d18..de5ca3148 100644
--- a/filter/graceful_shutdown/consumer_filter.go
+++ b/filter/graceful_shutdown/consumer_filter.go
@@ -31,6 +31,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/filter"
+ "dubbo.apache.org/dubbo-go/v3/global"
"dubbo.apache.org/dubbo-go/v3/protocol/base"
"dubbo.apache.org/dubbo-go/v3/protocol/result"
)
@@ -49,7 +50,7 @@ func init() {
}
type consumerGracefulShutdownFilter struct {
- shutdownConfig *config.ShutdownConfig
+ shutdownConfig *global.ShutdownConfig
}
func newConsumerGracefulShutdownFilter() filter.Filter {
@@ -76,11 +77,16 @@ func (f *consumerGracefulShutdownFilter) OnResponse(ctx
context.Context, result
func (f *consumerGracefulShutdownFilter) Set(name string, conf any) {
switch name {
case constant.GracefulShutdownFilterShutdownConfig:
- if shutdownConfig, ok := conf.(*config.ShutdownConfig); ok {
- f.shutdownConfig = shutdownConfig
- return
+ switch ct := conf.(type) {
+ case *global.ShutdownConfig:
+ f.shutdownConfig = ct
+ // only for compatibility with old config, able to directly
remove after config is deleted
+ case *config.ShutdownConfig:
+ f.shutdownConfig = compatGlobalShutdownConfig(ct)
+ default:
+ logger.Warnf("the type of config for {%s} should be
*global.ShutdownConfig", constant.GracefulShutdownFilterShutdownConfig)
}
- logger.Warnf("the type of config for {%s} should be
*config.ShutdownConfig", constant.GracefulShutdownFilterShutdownConfig)
+ return
default:
// do nothing
}
diff --git a/filter/graceful_shutdown/consumer_filter_test.go
b/filter/graceful_shutdown/consumer_filter_test.go
index c1407c6b2..f910fe7b2 100644
--- a/filter/graceful_shutdown/consumer_filter_test.go
+++ b/filter/graceful_shutdown/consumer_filter_test.go
@@ -32,26 +32,48 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/config"
+ "dubbo.apache.org/dubbo-go/v3/graceful_shutdown"
"dubbo.apache.org/dubbo-go/v3/protocol/base"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)
-func TestConusmerFilterInvoke(t *testing.T) {
- url := common.NewURLWithOptions(common.WithParams(url.Values{}))
- invocation := invocation.NewRPCInvocation("GetUser", []any{"OK"},
make(map[string]any))
+func TestConsumerFilterInvokeWithGlobalPackage(t *testing.T) {
+ var (
+ url =
common.NewURLWithOptions(common.WithParams(url.Values{}))
+ invocation = invocation.NewRPCInvocation("GetUser",
[]any{"OK"}, make(map[string]any))
+ opt = graceful_shutdown.NewOptions()
+ filterValue, _ =
extension.GetFilter(constant.GracefulShutdownConsumerFilterKey)
+ )
- rootConfig := config.NewRootConfigBuilder().
- SetShutDown(config.NewShutDownConfigBuilder().
- SetTimeout("60s").
- SetStepTimeout("3s").
- Build()).Build()
+ opt.Shutdown.RejectRequestHandler = "test"
+
+ filter := filterValue.(*consumerGracefulShutdownFilter)
+ filter.Set(constant.GracefulShutdownFilterShutdownConfig, opt.Shutdown)
+ assert.Equal(t, filter.shutdownConfig, opt.Shutdown)
+
+ result := filter.Invoke(context.Background(), base.NewBaseInvoker(url),
invocation)
+ assert.NotNil(t, result)
+ assert.Nil(t, result.Error())
+}
+
+// only for compatibility with old config, able to directly remove after
config is deleted
+func TestConsumerFilterInvokeWithConfigPackage(t *testing.T) {
+ var (
+ url =
common.NewURLWithOptions(common.WithParams(url.Values{}))
+ invocation = invocation.NewRPCInvocation("GetUser",
[]any{"OK"}, make(map[string]any))
+ rootConfig = config.NewRootConfigBuilder().
+ SetShutDown(config.NewShutDownConfigBuilder().
+ SetTimeout("60s").
+ SetStepTimeout("3s").
+ Build()).Build()
+ filterValue, _ =
extension.GetFilter(constant.GracefulShutdownConsumerFilterKey)
+ )
config.SetRootConfig(*rootConfig)
- filterValue, _ :=
extension.GetFilter(constant.GracefulShutdownConsumerFilterKey)
filter := filterValue.(*consumerGracefulShutdownFilter)
filter.Set(constant.GracefulShutdownFilterShutdownConfig,
config.GetShutDown())
- assert.Equal(t, filter.shutdownConfig, config.GetShutDown())
+ assert.Equal(t, filter.shutdownConfig,
compatGlobalShutdownConfig(config.GetShutDown()))
result := filter.Invoke(context.Background(), base.NewBaseInvoker(url),
invocation)
assert.NotNil(t, result)
diff --git a/filter/graceful_shutdown/provider_filter.go
b/filter/graceful_shutdown/provider_filter.go
index aab6937f4..9a9b40bba 100644
--- a/filter/graceful_shutdown/provider_filter.go
+++ b/filter/graceful_shutdown/provider_filter.go
@@ -32,6 +32,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/filter"
+ "dubbo.apache.org/dubbo-go/v3/global"
"dubbo.apache.org/dubbo-go/v3/protocol/base"
"dubbo.apache.org/dubbo-go/v3/protocol/result"
)
@@ -49,7 +50,7 @@ func init() {
}
type providerGracefulShutdownFilter struct {
- shutdownConfig *config.ShutdownConfig
+ shutdownConfig *global.ShutdownConfig
}
func newProviderGracefulShutdownFilter() filter.Filter {
@@ -61,7 +62,7 @@ func newProviderGracefulShutdownFilter() filter.Filter {
return psf
}
-// Invoke adds the requests count and block the new requests if application is
closing
+// Invoke adds the requests count and blocks the new requests if application
is closing
func (f *providerGracefulShutdownFilter) Invoke(ctx context.Context, invoker
base.Invoker, invocation base.Invocation) result.Result {
if f.rejectNewRequest() {
logger.Info("The application is closing, new request will be
rejected.")
@@ -90,11 +91,16 @@ func (f *providerGracefulShutdownFilter) OnResponse(ctx
context.Context, result
func (f *providerGracefulShutdownFilter) Set(name string, conf any) {
switch name {
case constant.GracefulShutdownFilterShutdownConfig:
- if shutdownConfig, ok := conf.(*config.ShutdownConfig); ok {
- f.shutdownConfig = shutdownConfig
- return
+ switch ct := conf.(type) {
+ case *global.ShutdownConfig:
+ f.shutdownConfig = ct
+ // only for compatibility with old config, able to directly
remove after config is deleted
+ case *config.ShutdownConfig:
+ f.shutdownConfig = compatGlobalShutdownConfig(ct)
+ default:
+ logger.Warnf("the type of config for {%s} should be
*global.ShutdownConfig", constant.GracefulShutdownFilterShutdownConfig)
}
- logger.Warnf("the type of config for {%s} should be
*config.ShutdownConfig", constant.GracefulShutdownFilterShutdownConfig)
+ return
default:
// do nothing
}
diff --git a/filter/graceful_shutdown/provider_filter_test.go
b/filter/graceful_shutdown/provider_filter_test.go
index d2821d959..10830f94a 100644
--- a/filter/graceful_shutdown/provider_filter_test.go
+++ b/filter/graceful_shutdown/provider_filter_test.go
@@ -35,42 +35,76 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/filter"
+ "dubbo.apache.org/dubbo-go/v3/graceful_shutdown"
"dubbo.apache.org/dubbo-go/v3/protocol/base"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
"dubbo.apache.org/dubbo-go/v3/protocol/result"
)
-func TestProviderFilterInvoke(t *testing.T) {
- url := common.NewURLWithOptions(common.WithParams(url.Values{}))
- invocation := invocation.NewRPCInvocation("GetUser", []any{"OK"},
make(map[string]any))
+func TestProviderFilterInvokeWithGlobalPackage(t *testing.T) {
+ var (
+ url =
common.NewURLWithOptions(common.WithParams(url.Values{}))
+ invocation = invocation.NewRPCInvocation("GetUser",
[]any{"OK"}, make(map[string]any))
+ opt = graceful_shutdown.NewOptions()
+ filterValue, _ =
extension.GetFilter(constant.GracefulShutdownProviderFilterKey)
+ )
extension.SetRejectedExecutionHandler("test", func()
filter.RejectedExecutionHandler {
return &TestRejectedExecutionHandler{}
})
- rootConfig := config.NewRootConfigBuilder().
- SetShutDown(config.NewShutDownConfigBuilder().
- SetTimeout("60s").
- SetStepTimeout("3s").
- SetRejectRequestHandler("test").
- Build()).Build()
+ opt.Shutdown.RejectRequestHandler = "test"
+
+ filter := filterValue.(*providerGracefulShutdownFilter)
+ filter.Set(constant.GracefulShutdownFilterShutdownConfig, opt.Shutdown)
+ assert.Equal(t, filter.shutdownConfig, opt.Shutdown)
+
+ result := filter.Invoke(context.Background(), base.NewBaseInvoker(url),
invocation)
+ assert.NotNil(t, result)
+ assert.Nil(t, result.Error())
+
+ opt.Shutdown.RejectRequest.Store(true)
+ result = filter.Invoke(context.Background(), base.NewBaseInvoker(url),
invocation)
+ assert.NotNil(t, result)
+ assert.NotNil(t, result.Error().Error(), "Rejected")
+}
+
+// only for compatibility with old config, able to directly remove after
config is deleted
+func TestProviderFilterInvokeWithConfigPackage(t *testing.T) {
+ var (
+ url =
common.NewURLWithOptions(common.WithParams(url.Values{}))
+ invocation = invocation.NewRPCInvocation("GetUser",
[]any{"OK"}, make(map[string]any))
+ rootConfig = config.NewRootConfigBuilder().
+ SetShutDown(config.NewShutDownConfigBuilder().
+ SetTimeout("60s").
+ SetStepTimeout("3s").
+ SetRejectRequestHandler("test").
+ Build()).Build()
+ filterValue, _ =
extension.GetFilter(constant.GracefulShutdownProviderFilterKey)
+ )
+
+ extension.SetRejectedExecutionHandler("test", func()
filter.RejectedExecutionHandler {
+ return &TestRejectedExecutionHandler{}
+ })
config.SetRootConfig(*rootConfig)
- filterValue, _ :=
extension.GetFilter(constant.GracefulShutdownProviderFilterKey)
filter := filterValue.(*providerGracefulShutdownFilter)
filter.Set(constant.GracefulShutdownFilterShutdownConfig,
config.GetShutDown())
- assert.Equal(t, filter.shutdownConfig, config.GetShutDown())
+ assert.Equal(t, filter.shutdownConfig,
compatGlobalShutdownConfig(config.GetShutDown()))
result := filter.Invoke(context.Background(), base.NewBaseInvoker(url),
invocation)
assert.NotNil(t, result)
assert.Nil(t, result.Error())
- config.GetShutDown().RejectRequest.Store(true)
+ // only use this way to set the RejectRequest, because the variable is
compact to GlobalShutdownConfig
+ filter.shutdownConfig.RejectRequest.Store(true)
+ // not able to use this way to set the RejectRequest
+ //config.GetShutDown().RejectRequest.Store(true)
+
result = filter.Invoke(context.Background(), base.NewBaseInvoker(url),
invocation)
assert.NotNil(t, result)
assert.NotNil(t, result.Error().Error(), "Rejected")
-
}
type TestRejectedExecutionHandler struct{}
diff --git a/graceful_shutdown/options.go b/graceful_shutdown/options.go
index aacb9e869..522cc9ef6 100644
--- a/graceful_shutdown/options.go
+++ b/graceful_shutdown/options.go
@@ -92,7 +92,7 @@ func WithRejectRequest() Option {
// ---------- For framework ----------
-func SetShutdown_Config(cfg *global.ShutdownConfig) Option {
+func SetShutdownConfig(cfg *global.ShutdownConfig) Option {
return func(opts *Options) {
opts.Shutdown = cfg
}
diff --git a/graceful_shutdown/shutdown.go b/graceful_shutdown/shutdown.go
index edb0ce0b5..0f4ae3389 100644
--- a/graceful_shutdown/shutdown.go
+++ b/graceful_shutdown/shutdown.go
@@ -33,6 +33,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/config"
+ "dubbo.apache.org/dubbo-go/v3/global"
)
const (
@@ -49,8 +50,7 @@ const (
)
var (
- initOnce sync.Once
- compatShutdown *config.ShutdownConfig
+ initOnce sync.Once
proMu sync.Mutex
protocols map[string]struct{}
@@ -63,38 +63,37 @@ func Init(opts ...Option) {
for _, opt := range opts {
opt(newOpts)
}
- compatShutdown = compatShutdownConfig(newOpts.Shutdown)
+
// retrieve ShutdownConfig for gracefulShutdownFilter
- cGracefulShutdownFilter, existcGracefulShutdownFilter :=
extension.GetFilter(constant.GracefulShutdownConsumerFilterKey)
- if !existcGracefulShutdownFilter {
+ gracefulShutdownConsumerFilter, exist :=
extension.GetFilter(constant.GracefulShutdownConsumerFilterKey)
+ if !exist {
return
}
- sGracefulShutdownFilter, existsGracefulShutdownFilter :=
extension.GetFilter(constant.GracefulShutdownProviderFilterKey)
- if !existsGracefulShutdownFilter {
+ gracefulShutdownProviderFilter, exist :=
extension.GetFilter(constant.GracefulShutdownProviderFilterKey)
+ if !exist {
return
}
- if filter, ok := cGracefulShutdownFilter.(config.Setter); ok {
-
filter.Set(constant.GracefulShutdownFilterShutdownConfig, compatShutdown)
+ if filter, ok :=
gracefulShutdownConsumerFilter.(config.Setter); ok {
+
filter.Set(constant.GracefulShutdownFilterShutdownConfig, newOpts.Shutdown)
}
- if filter, ok := sGracefulShutdownFilter.(config.Setter); ok {
-
filter.Set(constant.GracefulShutdownFilterShutdownConfig, compatShutdown)
+ if filter, ok :=
gracefulShutdownProviderFilter.(config.Setter); ok {
+
filter.Set(constant.GracefulShutdownFilterShutdownConfig, newOpts.Shutdown)
}
- if compatShutdown.InternalSignal != nil &&
*compatShutdown.InternalSignal {
+ if newOpts.Shutdown.InternalSignal != nil &&
*newOpts.Shutdown.InternalSignal {
signals := make(chan os.Signal, 1)
signal.Notify(signals, ShutdownSignals...)
go func() {
-
sig := <-signals
logger.Infof("get signal %s, applicationConfig
will shutdown.", sig)
// gracefulShutdownOnce.Do(func() {
- time.AfterFunc(totalTimeout(), func() {
+ time.AfterFunc(totalTimeout(newOpts.Shutdown),
func() {
logger.Warn("Shutdown gracefully
timeout, applicationConfig will shutdown immediately. ")
os.Exit(0)
})
- beforeShutdown()
+ beforeShutdown(newOpts.Shutdown)
// those signals' original behavior is exit
with dump ths stack, so we try to keep the behavior
for _, dumpSignal := range
DumpHeapShutdownSignals {
if sig == dumpSignal {
@@ -102,14 +101,13 @@ func Init(opts ...Option) {
}
}
os.Exit(0)
-
}()
}
})
}
// RegisterProtocol registers protocol which would be destroyed before
shutdown.
-// Please make sure that Init function has been invoked before, otherwise this
+// Please make sure that the Init function has been invoked before, otherwise
this
// function would not make any sense.
func RegisterProtocol(name string) {
proMu.Lock()
@@ -117,8 +115,8 @@ func RegisterProtocol(name string) {
proMu.Unlock()
}
-func totalTimeout() time.Duration {
- timeout := parseDuration(compatShutdown.Timeout, timeoutDesc,
defaultTimeout)
+func totalTimeout(shutdown *global.ShutdownConfig) time.Duration {
+ timeout := parseDuration(shutdown.Timeout, timeoutDesc, defaultTimeout)
if timeout < defaultTimeout {
timeout = defaultTimeout
}
@@ -126,14 +124,14 @@ func totalTimeout() time.Duration {
return timeout
}
-func beforeShutdown() {
+func beforeShutdown(shutdown *global.ShutdownConfig) {
destroyRegistries()
// waiting for a short time so that the clients have enough time to get
the notification that server shutdowns
// The value of configuration depends on how long the clients will get
notification.
- waitAndAcceptNewRequests()
+ waitAndAcceptNewRequests(shutdown)
- // reject sending/receiving the new request, but keeping waiting for
accepting requests
- waitForSendingAndReceivingRequests()
+ // reject sending/receiving the new request but keeping waiting for
accepting requests
+ waitForSendingAndReceivingRequests(shutdown)
// destroy all protocols
destroyProtocols()
@@ -152,54 +150,54 @@ func destroyRegistries() {
registryProtocol.Destroy()
}
-func waitAndAcceptNewRequests() {
+func waitAndAcceptNewRequests(shutdown *global.ShutdownConfig) {
logger.Info("Graceful shutdown --- Keep waiting and accept new requests
for a short time. ")
- updateWaitTime := parseDuration(compatShutdown.ConsumerUpdateWaitTime,
consumerUpdateWaitTimeDesc, defaultConsumerUpdateWaitTime)
+ updateWaitTime := parseDuration(shutdown.ConsumerUpdateWaitTime,
consumerUpdateWaitTimeDesc, defaultConsumerUpdateWaitTime)
time.Sleep(updateWaitTime)
- stepTimeout := parseDuration(compatShutdown.StepTimeout,
stepTimeoutDesc, defaultStepTimeout)
+ stepTimeout := parseDuration(shutdown.StepTimeout, stepTimeoutDesc,
defaultStepTimeout)
// ignore this step
if stepTimeout < 0 {
return
}
- waitingProviderProcessedTimeout(stepTimeout)
+ waitingProviderProcessedTimeout(shutdown, stepTimeout)
}
-func waitingProviderProcessedTimeout(timeout time.Duration) {
+func waitingProviderProcessedTimeout(shutdown *global.ShutdownConfig, timeout
time.Duration) {
deadline := time.Now().Add(timeout)
- offlineRequestWindowTimeout :=
parseDuration(compatShutdown.OfflineRequestWindowTimeout,
offlineRequestWindowTimeoutDesc, defaultOfflineRequestWindowTimeout)
+ offlineRequestWindowTimeout :=
parseDuration(shutdown.OfflineRequestWindowTimeout,
offlineRequestWindowTimeoutDesc, defaultOfflineRequestWindowTimeout)
for time.Now().Before(deadline) &&
- (compatShutdown.ProviderActiveCount.Load() > 0 ||
time.Now().Before(compatShutdown.ProviderLastReceivedRequestTime.Load().Add(offlineRequestWindowTimeout)))
{
+ (shutdown.ProviderActiveCount.Load() > 0 ||
time.Now().Before(shutdown.ProviderLastReceivedRequestTime.Load().Add(offlineRequestWindowTimeout)))
{
// sleep 10 ms and then we check it again
time.Sleep(10 * time.Millisecond)
logger.Infof("waiting for provider active invocation count =
%d, provider last received request time: %v",
- compatShutdown.ProviderActiveCount.Load(),
compatShutdown.ProviderLastReceivedRequestTime.Load())
+ shutdown.ProviderActiveCount.Load(),
shutdown.ProviderLastReceivedRequestTime.Load())
}
}
-// for provider. It will wait for processing receiving requests
-func waitForSendingAndReceivingRequests() {
+// For provider. It will wait for processing receiving requests
+func waitForSendingAndReceivingRequests(shutdown *global.ShutdownConfig) {
logger.Info("Graceful shutdown --- Keep waiting until sending/accepting
requests finish or timeout. ")
- compatShutdown.RejectRequest.Store(true)
- waitingConsumerProcessedTimeout()
+ shutdown.RejectRequest.Store(true)
+ waitingConsumerProcessedTimeout(shutdown)
}
-func waitingConsumerProcessedTimeout() {
- stepTimeout := parseDuration(compatShutdown.StepTimeout,
stepTimeoutDesc, defaultStepTimeout)
+func waitingConsumerProcessedTimeout(shutdown *global.ShutdownConfig) {
+ stepTimeout := parseDuration(shutdown.StepTimeout, stepTimeoutDesc,
defaultStepTimeout)
if stepTimeout <= 0 {
return
}
deadline := time.Now().Add(stepTimeout)
- for time.Now().Before(deadline) &&
compatShutdown.ConsumerActiveCount.Load() > 0 {
+ for time.Now().Before(deadline) && shutdown.ConsumerActiveCount.Load()
> 0 {
// sleep 10 ms and then we check it again
time.Sleep(10 * time.Millisecond)
- logger.Infof("waiting for consumer active invocation count =
%d", compatShutdown.ConsumerActiveCount.Load())
+ logger.Infof("waiting for consumer active invocation count =
%d", shutdown.ConsumerActiveCount.Load())
}
}
diff --git a/server/options.go b/server/options.go
index 9c6d48944..dbc561b85 100644
--- a/server/options.go
+++ b/server/options.go
@@ -108,7 +108,7 @@ func (srvOpts *ServerOptions) init(opts ...ServerOption)
error {
}
// init graceful_shutdown
-
graceful_shutdown.Init(graceful_shutdown.SetShutdown_Config(srvOpts.Shutdown))
+
graceful_shutdown.Init(graceful_shutdown.SetShutdownConfig(srvOpts.Shutdown))
return nil
}