This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 8aea261  fix tps filter panic bug(pre-check the configuration of tps 
at the st… (#1604)
8aea261 is described below

commit 8aea261fa57206dd6e80a057717beddcd18f32c4
Author: Mulavar <[email protected]>
AuthorDate: Sun Feb 13 12:02:28 2022 +0800

    fix tps filter panic bug(pre-check the configuration of tps at the st… 
(#1604)
    
    * fix tps filter panic bug(pre-check the configuration of tps at the start 
phase)
    
    * fix tps limit unit test
    
    * add exists flag to tps filter extension
    
    Co-authored-by: dongjianhui03 <[email protected]>
---
 common/constant/key.go                    |   4 +-
 common/extension/tps_limit.go             |  16 +-
 config/graceful_shutdown.go               |   2 +-
 config/method_config.go                   |  52 +++++--
 config/service_config.go                  | 237 ++++++++++++++++++------------
 filter/tps/filter.go                      |   7 +-
 filter/tps/limiter/method_service.go      |  21 ++-
 filter/tps/limiter/method_service_test.go |   3 +-
 8 files changed, 221 insertions(+), 121 deletions(-)

diff --git a/common/constant/key.go b/common/constant/key.go
index fdd9359..6480841 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -112,9 +112,9 @@ const (
        TPSLimiterKey                      = "tps.limiter"
        TPSRejectedExecutionHandlerKey     = "tps.limit.rejected.handler"
        TPSLimitRateKey                    = "tps.limit.rate"
-       DefaultTPSLimitRate                = "-1"
+       DefaultTPSLimitRate                = -1
        TPSLimitIntervalKey                = "tps.limit.interval"
-       DefaultTPSLimitInterval            = "60000"
+       DefaultTPSLimitInterval            = -1
        TPSLimitStrategyKey                = "tps.limit.strategy"
        ExecuteLimitKey                    = "execute.limit"
        DefaultExecuteLimit                = "-1"
diff --git a/common/extension/tps_limit.go b/common/extension/tps_limit.go
index e5ff81e..8f678d1 100644
--- a/common/extension/tps_limit.go
+++ b/common/extension/tps_limit.go
@@ -18,6 +18,10 @@
 package extension
 
 import (
+       "errors"
+)
+
+import (
        "dubbo.apache.org/dubbo-go/v3/filter"
 )
 
@@ -32,13 +36,13 @@ func SetTpsLimiter(name string, creator func() 
filter.TpsLimiter) {
 }
 
 // GetTpsLimiter finds the TpsLimiter with @name
-func GetTpsLimiter(name string) filter.TpsLimiter {
+func GetTpsLimiter(name string) (filter.TpsLimiter, error) {
        creator, ok := tpsLimiter[name]
        if !ok {
-               panic("TpsLimiter for " + name + " is not existing, make sure 
you have import the package " +
+               return nil, errors.New("TpsLimiter for " + name + " is not 
existing, make sure you have import the package " +
                        "and you have register it by invoking 
extension.SetTpsLimiter.")
        }
-       return creator()
+       return creator(), nil
 }
 
 // SetTpsLimitStrategy sets the TpsLimitStrategyCreator with @name
@@ -47,11 +51,11 @@ func SetTpsLimitStrategy(name string, creator 
filter.TpsLimitStrategyCreator) {
 }
 
 // GetTpsLimitStrategyCreator finds the TpsLimitStrategyCreator with @name
-func GetTpsLimitStrategyCreator(name string) filter.TpsLimitStrategyCreator {
+func GetTpsLimitStrategyCreator(name string) (filter.TpsLimitStrategyCreator, 
error) {
        creator, ok := tpsLimitStrategy[name]
        if !ok {
-               panic("TpsLimitStrategy for " + name + " is not existing, make 
sure you have import the package " +
+               return nil, errors.New("TpsLimitStrategy for " + name + " is 
not existing, make sure you have import the package " +
                        "and you have register it by invoking 
extension.SetTpsLimitStrategy.")
        }
-       return creator
+       return creator, nil
 }
diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go
index 98d5ac6..01ebbbc 100644
--- a/config/graceful_shutdown.go
+++ b/config/graceful_shutdown.go
@@ -195,7 +195,7 @@ func waitForSendingAndReceivingRequests() {
                // ignore this step
                return
        }
-       rootConfig.Shutdown.RejectRequest = true
+       rootConfig.Shutdown.RejectRequest.Store(true)
        waitingConsumerProcessedTimeout(rootConfig.Shutdown)
 }
 
diff --git a/config/method_config.go b/config/method_config.go
index 4e45987..093df5d 100644
--- a/config/method_config.go
+++ b/config/method_config.go
@@ -18,11 +18,17 @@
 package config
 
 import (
+       "fmt"
+       "strconv"
+)
+
+import (
        "github.com/creasty/defaults"
 )
 
 import (
        "dubbo.apache.org/dubbo-go/v3/common/constant"
+       "dubbo.apache.org/dubbo-go/v3/common/extension"
 )
 
 // MethodConfig defines method config
@@ -43,16 +49,16 @@ type MethodConfig struct {
 }
 
 // nolint
-func (mc *MethodConfig) Prefix() string {
-       if len(mc.InterfaceId) != 0 {
-               return constant.Dubbo + "." + mc.InterfaceName + "." + 
mc.InterfaceId + "." + mc.Name + "."
+func (m *MethodConfig) Prefix() string {
+       if len(m.InterfaceId) != 0 {
+               return constant.Dubbo + "." + m.InterfaceName + "." + 
m.InterfaceId + "." + m.Name + "."
        }
 
-       return constant.Dubbo + "." + mc.InterfaceName + "." + mc.Name + "."
+       return constant.Dubbo + "." + m.InterfaceName + "." + m.Name + "."
 }
 
-func (mc *MethodConfig) Init() error {
-       return mc.check()
+func (m *MethodConfig) Init() error {
+       return m.check()
 }
 
 func initProviderMethodConfig(sc *ServiceConfig) error {
@@ -70,9 +76,37 @@ func initProviderMethodConfig(sc *ServiceConfig) error {
 }
 
 // check set default value and verify
-func (mc *MethodConfig) check() error {
-       if err := defaults.Set(mc); err != nil {
+func (m *MethodConfig) check() error {
+       qualifieldMethodName := m.InterfaceName + "#" + m.Name
+       if m.TpsLimitStrategy != "" {
+               _, err := 
extension.GetTpsLimitStrategyCreator(m.TpsLimitStrategy)
+               if err != nil {
+                       panic(err)
+               }
+       }
+
+       if m.TpsLimitInterval != "" {
+               tpsLimitInterval, err := strconv.ParseInt(m.TpsLimitInterval, 
0, 0)
+               if err != nil {
+                       return fmt.Errorf("[MethodConfig] Cannot parse the 
configuration tps.limit.interval for method %s, please check your 
configuration", qualifieldMethodName)
+               }
+               if tpsLimitInterval < 0 {
+                       return fmt.Errorf("[MethodConfig] The configuration 
tps.limit.interval for %s must be positive, please check your configuration", 
qualifieldMethodName)
+               }
+       }
+
+       if m.TpsLimitRate != "" {
+               tpsLimitRate, err := strconv.ParseInt(m.TpsLimitRate, 0, 0)
+               if err != nil {
+                       return fmt.Errorf("[MethodConfig] Cannot parse the 
configuration tps.limit.rate for method %s, please check your configuration", 
qualifieldMethodName)
+               }
+               if tpsLimitRate < 0 {
+                       return fmt.Errorf("[MethodConfig] The configuration 
tps.limit.rate for method %s must be positive, please check your 
configuration", qualifieldMethodName)
+               }
+       }
+
+       if err := defaults.Set(m); err != nil {
                return err
        }
-       return verify(mc)
+       return verify(m)
 }
diff --git a/config/service_config.go b/config/service_config.go
index ff60946..a550812 100644
--- a/config/service_config.go
+++ b/config/service_config.go
@@ -95,49 +95,96 @@ type ServiceConfig struct {
 }
 
 // Prefix returns dubbo.service.${InterfaceName}.
-func (svc *ServiceConfig) Prefix() string {
-       return strings.Join([]string{constant.ServiceConfigPrefix, svc.id}, ".")
+func (s *ServiceConfig) Prefix() string {
+       return strings.Join([]string{constant.ServiceConfigPrefix, s.id}, ".")
 }
 
-func (svc *ServiceConfig) Init(rc *RootConfig) error {
-       if err := initProviderMethodConfig(svc); err != nil {
+func (s *ServiceConfig) Init(rc *RootConfig) error {
+       if err := initProviderMethodConfig(s); err != nil {
                return err
        }
-       if err := defaults.Set(svc); err != nil {
+       if err := defaults.Set(s); err != nil {
                return err
        }
-       svc.exported = atomic.NewBool(false)
-       svc.metadataType = rc.Application.MetadataType
-       svc.unexported = atomic.NewBool(false)
-       svc.RCRegistriesMap = rc.Registries
-       svc.RCProtocolsMap = rc.Protocols
+       s.exported = atomic.NewBool(false)
+       s.metadataType = rc.Application.MetadataType
+       s.unexported = atomic.NewBool(false)
+       s.RCRegistriesMap = rc.Registries
+       s.RCProtocolsMap = rc.Protocols
        if rc.Provider != nil {
-               svc.ProxyFactoryKey = rc.Provider.ProxyFactory
+               s.ProxyFactoryKey = rc.Provider.ProxyFactory
        }
-       svc.RegistryIDs = translateRegistryIds(svc.RegistryIDs)
-       if len(svc.RegistryIDs) <= 0 {
-               svc.RegistryIDs = rc.Provider.RegistryIDs
+       s.RegistryIDs = translateRegistryIds(s.RegistryIDs)
+       if len(s.RegistryIDs) <= 0 {
+               s.RegistryIDs = rc.Provider.RegistryIDs
        }
-       if len(svc.ProtocolIDs) <= 0 {
+       if len(s.ProtocolIDs) <= 0 {
                for k, _ := range rc.Protocols {
-                       svc.ProtocolIDs = append(svc.ProtocolIDs, k)
+                       s.ProtocolIDs = append(s.ProtocolIDs, k)
                }
        }
-       if svc.TracingKey == "" {
-               svc.TracingKey = rc.Provider.TracingKey
+       if s.TracingKey == "" {
+               s.TracingKey = rc.Provider.TracingKey
        }
-       svc.export = true
-       return verify(svc)
+       err := s.check()
+       if err != nil {
+               panic(err)
+       }
+       s.export = true
+       return verify(s)
+}
+
+func (s *ServiceConfig) check() error {
+       // check if the limiter has been imported
+       if s.TpsLimiter != "" {
+               _, err := extension.GetTpsLimiter(s.TpsLimiter)
+               if err != nil {
+                       panic(err)
+               }
+       }
+       if s.TpsLimitStrategy != "" {
+               _, err := 
extension.GetTpsLimitStrategyCreator(s.TpsLimitStrategy)
+               if err != nil {
+                       panic(err)
+               }
+       }
+       if s.TpsLimitRejectedHandler != "" {
+               _, err := 
extension.GetRejectedExecutionHandler(s.TpsLimitRejectedHandler)
+               if err != nil {
+                       panic(err)
+               }
+       }
+
+       if s.TpsLimitInterval != "" {
+               tpsLimitInterval, err := strconv.ParseInt(s.TpsLimitInterval, 
0, 0)
+               if err != nil {
+                       return fmt.Errorf("[ServiceConfig] Cannot parse the 
configuration tps.limit.interval for service %s, please check your 
configuration", s.Interface)
+               }
+               if tpsLimitInterval < 0 {
+                       return fmt.Errorf("[ServiceConfig] The configuration 
tps.limit.interval for service %s must be positive, please check your 
configuration", s.Interface)
+               }
+       }
+
+       if s.TpsLimitRate != "" {
+               tpsLimitRate, err := strconv.ParseInt(s.TpsLimitRate, 0, 0)
+               if err != nil {
+                       return fmt.Errorf("[ServiceConfig] Cannot parse the 
configuration tps.limit.rate for service %s, please check your configuration", 
s.Interface)
+               }
+               if tpsLimitRate < 0 {
+                       return fmt.Errorf("[ServiceConfig] The configuration 
tps.limit.rate for service %s must be positive, please check your 
configuration", s.Interface)
+               }
+       }
+       return nil
 }
 
 // InitExported will set exported as false atom bool
-func (svc *ServiceConfig) InitExported() {
-       svc.exported = atomic.NewBool(false)
+func (s *ServiceConfig) InitExported() {
+       s.exported = atomic.NewBool(false)
 }
 
 // IsExport will return whether the service config is exported or not
-func (svc *ServiceConfig) IsExport() bool {
-       return svc.exported.Load()
+func (s *ServiceConfig) IsExport() bool {
+       return s.exported.Load()
 }
 
 // Get Random Port
@@ -159,35 +206,35 @@ func getRandomPort(protocolConfigs []*ProtocolConfig) 
*list.List {
 }
 
 // Export exports the service
-func (svc *ServiceConfig) Export() error {
+func (s *ServiceConfig) Export() error {
        // TODO: delay export
-       if svc.unexported != nil && svc.unexported.Load() {
-               err := perrors.Errorf("The service %v has already unexported!", 
svc.Interface)
+       if s.unexported != nil && s.unexported.Load() {
+               err := perrors.Errorf("The service %v has already unexported!", 
s.Interface)
                logger.Errorf(err.Error())
                return err
        }
-       if svc.exported != nil && svc.exported.Load() {
-               logger.Warnf("The service %v has already exported!", 
svc.Interface)
+       if s.exported != nil && s.exported.Load() {
+               logger.Warnf("The service %v has already exported!", 
s.Interface)
                return nil
        }
 
-       regUrls := loadRegistries(svc.RegistryIDs, svc.RCRegistriesMap, 
common.PROVIDER)
-       urlMap := svc.getUrlMap()
-       protocolConfigs := loadProtocol(svc.ProtocolIDs, svc.RCProtocolsMap)
+       regUrls := loadRegistries(s.RegistryIDs, s.RCRegistriesMap, 
common.PROVIDER)
+       urlMap := s.getUrlMap()
+       protocolConfigs := loadProtocol(s.ProtocolIDs, s.RCProtocolsMap)
        if len(protocolConfigs) == 0 {
-               logger.Warnf("The service %v's '%v' protocols don't has right 
protocolConfigs, Please check your configuration center and transfer protocol 
", svc.Interface, svc.ProtocolIDs)
+               logger.Warnf("The service %v's '%v' protocols don't has right 
protocolConfigs, Please check your configuration center and transfer protocol 
", s.Interface, s.ProtocolIDs)
                return nil
        }
 
        ports := getRandomPort(protocolConfigs)
        nextPort := ports.Front()
-       proxyFactory := extension.GetProxyFactory(svc.ProxyFactoryKey)
+       proxyFactory := extension.GetProxyFactory(s.ProxyFactoryKey)
        for _, proto := range protocolConfigs {
                // registry the service reflect
-               methods, err := common.ServiceMap.Register(svc.Interface, 
proto.Name, svc.Group, svc.Version, svc.rpcService)
+               methods, err := common.ServiceMap.Register(s.Interface, 
proto.Name, s.Group, s.Version, s.rpcService)
                if err != nil {
                        formatErr := perrors.Errorf("The service %v export the 
protocol %v error! Error message is %v.",
-                               svc.Interface, proto.Name, err.Error())
+                               s.Interface, proto.Name, err.Error())
                        logger.Errorf(formatErr.Error())
                        return formatErr
                }
@@ -198,44 +245,44 @@ func (svc *ServiceConfig) Export() error {
                        nextPort = nextPort.Next()
                }
                ivkURL := common.NewURLWithOptions(
-                       common.WithPath(svc.Interface),
+                       common.WithPath(s.Interface),
                        common.WithProtocol(proto.Name),
                        common.WithIp(proto.Ip),
                        common.WithPort(port),
                        common.WithParams(urlMap),
-                       common.WithParamsValue(constant.BeanNameKey, svc.id),
+                       common.WithParamsValue(constant.BeanNameKey, s.id),
                        //common.WithParamsValue(constant.SslEnabledKey, 
strconv.FormatBool(config.GetSslEnabled())),
                        common.WithMethods(strings.Split(methods, ",")),
-                       common.WithToken(svc.Token),
-                       common.WithParamsValue(constant.MetadataTypeKey, 
svc.metadataType),
+                       common.WithToken(s.Token),
+                       common.WithParamsValue(constant.MetadataTypeKey, 
s.metadataType),
                )
-               if len(svc.Tag) > 0 {
-                       ivkURL.AddParam(constant.Tagkey, svc.Tag)
+               if len(s.Tag) > 0 {
+                       ivkURL.AddParam(constant.Tagkey, s.Tag)
                }
 
                // post process the URL to be exported
-               svc.postProcessConfig(ivkURL)
+               s.postProcessConfig(ivkURL)
                // config post processor may set "export" to false
                if !ivkURL.GetParamBool(constant.ExportKey, true) {
                        return nil
                }
 
                if len(regUrls) > 0 {
-                       svc.cacheMutex.Lock()
-                       if svc.cacheProtocol == nil {
+                       s.cacheMutex.Lock()
+                       if s.cacheProtocol == nil {
                                logger.Debugf(fmt.Sprintf("First load the 
registry protocol, url is {%v}!", ivkURL))
-                               svc.cacheProtocol = 
extension.GetProtocol("registry")
+                               s.cacheProtocol = 
extension.GetProtocol("registry")
                        }
-                       svc.cacheMutex.Unlock()
+                       s.cacheMutex.Unlock()
 
                        for _, regUrl := range regUrls {
                                setRegistrySubURL(ivkURL, regUrl)
                                invoker := proxyFactory.GetInvoker(regUrl)
-                               exporter := svc.cacheProtocol.Export(invoker)
+                               exporter := s.cacheProtocol.Export(invoker)
                                if exporter == nil {
                                        return 
perrors.New(fmt.Sprintf("Registry protocol new exporter error, registry is 
{%v}, url is {%v}", regUrl, ivkURL))
                                }
-                               svc.exporters = append(svc.exporters, exporter)
+                               s.exporters = append(s.exporters, exporter)
                        }
                } else {
                        if ivkURL.GetParam(constant.InterfaceKey, "") == 
constant.MetadataServiceName {
@@ -253,11 +300,11 @@ func (svc *ServiceConfig) Export() error {
                        if exporter == nil {
                                return perrors.New(fmt.Sprintf("Filter protocol 
without registry new exporter error, url is {%v}", ivkURL))
                        }
-                       svc.exporters = append(svc.exporters, exporter)
+                       s.exporters = append(s.exporters, exporter)
                }
                publishServiceDefinition(ivkURL)
        }
-       svc.exported.Store(true)
+       s.exported.Store(true)
        return nil
 }
 
@@ -318,56 +365,56 @@ func loadRegistries(registryIds []string, registries 
map[string]*RegistryConfig,
 }
 
 // Unexport will call unexport of all exporters service config exported
-func (svc *ServiceConfig) Unexport() {
-       if !svc.exported.Load() {
+func (s *ServiceConfig) Unexport() {
+       if !s.exported.Load() {
                return
        }
-       if svc.unexported.Load() {
+       if s.unexported.Load() {
                return
        }
 
        func() {
-               svc.exportersLock.Lock()
-               defer svc.exportersLock.Unlock()
-               for _, exporter := range svc.exporters {
+               s.exportersLock.Lock()
+               defer s.exportersLock.Unlock()
+               for _, exporter := range s.exporters {
                        exporter.Unexport()
                }
-               svc.exporters = nil
+               s.exporters = nil
        }()
 
-       svc.exported.Store(false)
-       svc.unexported.Store(true)
+       s.exported.Store(false)
+       s.unexported.Store(true)
 }
 
 // Implement only store the @s and return
-func (svc *ServiceConfig) Implement(s common.RPCService) {
-       svc.rpcService = s
+func (s *ServiceConfig) Implement(rpcService common.RPCService) {
+       s.rpcService = rpcService
 }
 
-func (svc *ServiceConfig) getUrlMap() url.Values {
+func (s *ServiceConfig) getUrlMap() url.Values {
        urlMap := url.Values{}
        // first set user params
-       for k, v := range svc.Params {
+       for k, v := range s.Params {
                urlMap.Set(k, v)
        }
-       urlMap.Set(constant.InterfaceKey, svc.Interface)
+       urlMap.Set(constant.InterfaceKey, s.Interface)
        urlMap.Set(constant.TimestampKey, strconv.FormatInt(time.Now().Unix(), 
10))
-       urlMap.Set(constant.ClusterKey, svc.Cluster)
-       urlMap.Set(constant.LoadbalanceKey, svc.Loadbalance)
-       urlMap.Set(constant.WarmupKey, svc.Warmup)
-       urlMap.Set(constant.RetriesKey, svc.Retries)
-       if svc.Group != "" {
-               urlMap.Set(constant.GroupKey, svc.Group)
+       urlMap.Set(constant.ClusterKey, s.Cluster)
+       urlMap.Set(constant.LoadbalanceKey, s.Loadbalance)
+       urlMap.Set(constant.WarmupKey, s.Warmup)
+       urlMap.Set(constant.RetriesKey, s.Retries)
+       if s.Group != "" {
+               urlMap.Set(constant.GroupKey, s.Group)
        }
-       if svc.Version != "" {
-               urlMap.Set(constant.VersionKey, svc.Version)
+       if s.Version != "" {
+               urlMap.Set(constant.VersionKey, s.Version)
        }
        urlMap.Set(constant.RegistryRoleKey, strconv.Itoa(common.PROVIDER))
        urlMap.Set(constant.ReleaseKey, "dubbo-golang-"+constant.Version)
        urlMap.Set(constant.SideKey, (common.RoleType(common.PROVIDER)).Role())
-       urlMap.Set(constant.MessageSizeKey, 
strconv.Itoa(svc.GrpcMaxMessageSize))
+       urlMap.Set(constant.MessageSizeKey, strconv.Itoa(s.GrpcMaxMessageSize))
        // todo: move
-       urlMap.Set(constant.SerializationKey, svc.Serialization)
+       urlMap.Set(constant.SerializationKey, s.Serialization)
        // application config info
        ac := GetApplicationConfig()
        urlMap.Set(constant.ApplicationKey, ac.Name)
@@ -380,39 +427,39 @@ func (svc *ServiceConfig) getUrlMap() url.Values {
 
        // filter
        var filters string
-       if svc.Filter == "" {
+       if s.Filter == "" {
                filters = constant.DefaultServiceFilters
        } else {
-               filters = svc.Filter
+               filters = s.Filter
        }
-       if svc.adaptiveService {
+       if s.adaptiveService {
                filters += fmt.Sprintf(",%s", 
constant.AdaptiveServiceProviderFilterKey)
        }
        urlMap.Set(constant.ServiceFilterKey, filters)
 
        // filter special config
-       urlMap.Set(constant.AccessLogFilterKey, svc.AccessLog)
+       urlMap.Set(constant.AccessLogFilterKey, s.AccessLog)
        // tps limiter
-       urlMap.Set(constant.TPSLimitStrategyKey, svc.TpsLimitStrategy)
-       urlMap.Set(constant.TPSLimitIntervalKey, svc.TpsLimitInterval)
-       urlMap.Set(constant.TPSLimitRateKey, svc.TpsLimitRate)
-       urlMap.Set(constant.TPSLimiterKey, svc.TpsLimiter)
-       urlMap.Set(constant.TPSRejectedExecutionHandlerKey, 
svc.TpsLimitRejectedHandler)
-       urlMap.Set(constant.TracingConfigKey, svc.TracingKey)
+       urlMap.Set(constant.TPSLimitStrategyKey, s.TpsLimitStrategy)
+       urlMap.Set(constant.TPSLimitIntervalKey, s.TpsLimitInterval)
+       urlMap.Set(constant.TPSLimitRateKey, s.TpsLimitRate)
+       urlMap.Set(constant.TPSLimiterKey, s.TpsLimiter)
+       urlMap.Set(constant.TPSRejectedExecutionHandlerKey, 
s.TpsLimitRejectedHandler)
+       urlMap.Set(constant.TracingConfigKey, s.TracingKey)
 
        // execute limit filter
-       urlMap.Set(constant.ExecuteLimitKey, svc.ExecuteLimit)
-       urlMap.Set(constant.ExecuteRejectedExecutionHandlerKey, 
svc.ExecuteLimitRejectedHandler)
+       urlMap.Set(constant.ExecuteLimitKey, s.ExecuteLimit)
+       urlMap.Set(constant.ExecuteRejectedExecutionHandlerKey, 
s.ExecuteLimitRejectedHandler)
 
        // auth filter
-       urlMap.Set(constant.ServiceAuthKey, svc.Auth)
-       urlMap.Set(constant.ParameterSignatureEnableKey, svc.ParamSign)
+       urlMap.Set(constant.ServiceAuthKey, s.Auth)
+       urlMap.Set(constant.ParameterSignatureEnableKey, s.ParamSign)
 
        // whether to export or not
-       urlMap.Set(constant.ExportKey, strconv.FormatBool(svc.export))
+       urlMap.Set(constant.ExportKey, strconv.FormatBool(s.export))
        urlMap.Set(constant.PIDKey, fmt.Sprintf("%d", os.Getpid()))
 
-       for _, v := range svc.Methods {
+       for _, v := range s.Methods {
                prefix := "methods." + v.Name + "."
                urlMap.Set(prefix+constant.LoadbalanceKey, v.LoadBalance)
                urlMap.Set(prefix+constant.RetriesKey, v.Retries)
@@ -430,10 +477,10 @@ func (svc *ServiceConfig) getUrlMap() url.Values {
 }
 
 // GetExportedUrls will return the url in service config's exporter
-func (svc *ServiceConfig) GetExportedUrls() []*common.URL {
-       if svc.exported.Load() {
+func (s *ServiceConfig) GetExportedUrls() []*common.URL {
+       if s.exported.Load() {
                var urls []*common.URL
-               for _, exporter := range svc.exporters {
+               for _, exporter := range s.exporters {
                        urls = append(urls, exporter.GetInvoker().GetURL())
                }
                return urls
@@ -442,7 +489,7 @@ func (svc *ServiceConfig) GetExportedUrls() []*common.URL {
 }
 
 // postProcessConfig asks registered ConfigPostProcessor to post-process the 
current ServiceConfig.
-func (svc *ServiceConfig) postProcessConfig(url *common.URL) {
+func (s *ServiceConfig) postProcessConfig(url *common.URL) {
        for _, p := range extension.GetConfigPostProcessors() {
                p.PostProcessServiceConfig(url)
        }
diff --git a/filter/tps/filter.go b/filter/tps/filter.go
index 6577059..f57e909 100644
--- a/filter/tps/filter.go
+++ b/filter/tps/filter.go
@@ -72,7 +72,12 @@ func (t *tpsLimitFilter) Invoke(ctx context.Context, invoker 
protocol.Invoker, i
        tpsLimiter := url.GetParam(constant.TPSLimiterKey, "")
        rejectedExeHandler := 
url.GetParam(constant.TPSRejectedExecutionHandlerKey, constant.DefaultKey)
        if len(tpsLimiter) > 0 {
-               allow := 
extension.GetTpsLimiter(tpsLimiter).IsAllowable(invoker.GetURL(), invocation)
+               limiter, err := extension.GetTpsLimiter(tpsLimiter)
+               if err != nil {
+                       logger.Warn(err)
+                       return invoker.Invoke(ctx, invocation)
+               }
+               allow := limiter.IsAllowable(invoker.GetURL(), invocation)
                if allow {
                        return invoker.Invoke(ctx, invocation)
                }
diff --git a/filter/tps/limiter/method_service.go 
b/filter/tps/limiter/method_service.go
index 41c16f8..8925e04 100644
--- a/filter/tps/limiter/method_service.go
+++ b/filter/tps/limiter/method_service.go
@@ -31,6 +31,7 @@ import (
        "dubbo.apache.org/dubbo-go/v3/common"
        "dubbo.apache.org/dubbo-go/v3/common/constant"
        "dubbo.apache.org/dubbo-go/v3/common/extension"
+       "dubbo.apache.org/dubbo-go/v3/common/logger"
        "dubbo.apache.org/dubbo-go/v3/filter"
        "dubbo.apache.org/dubbo-go/v3/protocol"
 )
@@ -150,6 +151,7 @@ func (limiter MethodServiceTpsLimiter) IsAllowable(url 
*common.URL, invocation p
 
        if limitRate < 0 {
                // the limitTarget is not necessary to be limited.
+               logger.Errorf("Found error configuration value of 
tps.limit.rate for the invocation %s, ignores TPS Limiter", 
url.ServiceKey()+"#"+invocation.MethodName())
                return true
        }
 
@@ -157,13 +159,18 @@ func (limiter MethodServiceTpsLimiter) IsAllowable(url 
*common.URL, invocation p
                constant.TPSLimitIntervalKey,
                constant.DefaultTPSLimitInterval)
        if limitInterval <= 0 {
-               panic(fmt.Sprintf("The interval must be positive, please check 
your configuration! url: %s", url.String()))
+               logger.Errorf(fmt.Sprintf("Found error configuration value of 
tps.limit.interval for the invocation %s, ignores TPS Limiter", 
url.ServiceKey()+"#"+invocation.MethodName()))
+               return true
        }
 
        // find the strategy config and then create one
        limitStrategyConfig := 
url.GetParam(methodConfigPrefix+constant.TPSLimitStrategyKey,
                url.GetParam(constant.TPSLimitStrategyKey, constant.DefaultKey))
-       limitStateCreator := 
extension.GetTpsLimitStrategyCreator(limitStrategyConfig)
+       limitStateCreator, err := 
extension.GetTpsLimitStrategyCreator(limitStrategyConfig)
+       if err != nil {
+               logger.Warn(err)
+               return true
+       }
 
        // we using loadOrStore to ensure thread-safe
        limitState, _ = limiter.tpsState.LoadOrStore(limitTarget, 
limitStateCreator.Create(int(limitRate), int(limitInterval)))
@@ -178,22 +185,24 @@ func getLimitConfig(methodLevelConfig string,
        url *common.URL,
        invocation protocol.Invocation,
        configKey string,
-       defaultVal string) int64 {
+       defaultVal int64) int64 {
 
        if len(methodLevelConfig) > 0 {
                result, err := strconv.ParseInt(methodLevelConfig, 0, 0)
                if err != nil {
-                       panic(fmt.Sprintf("The %s for invocation %s # %s must 
be positive, please check your configuration!",
+                       logger.Error(fmt.Sprintf("The %s for invocation %s # %s 
must be positive, please check your configuration!",
                                configKey, url.ServiceKey(), 
invocation.MethodName()))
+                       return defaultVal
                }
                return result
        }
 
        // actually there is no method-level configuration, so we use the 
service-level configuration
 
-       result, err := strconv.ParseInt(url.GetParam(configKey, defaultVal), 0, 
0)
+       result, err := strconv.ParseInt(url.GetParam(configKey, ""), 0, 0)
        if err != nil {
-               panic(fmt.Sprintf("Cannot parse the configuration %s, please 
check your configuration!", configKey))
+               logger.Errorf(fmt.Sprintf("Cannot parse the configuration %s, 
please check your configuration!", configKey))
+               return defaultVal
        }
        return result
 }
diff --git a/filter/tps/limiter/method_service_test.go 
b/filter/tps/limiter/method_service_test.go
index 21886fe..e6a3e93 100644
--- a/filter/tps/limiter/method_service_test.go
+++ b/filter/tps/limiter/method_service_test.go
@@ -47,7 +47,8 @@ func 
TestMethodServiceTpsLimiterImplIsAllowableOnlyServiceLevel(t *testing.T) {
        invokeUrl := common.NewURLWithOptions(
                common.WithParams(url.Values{}),
                common.WithParamsValue(constant.InterfaceKey, methodName),
-               common.WithParamsValue(constant.TPSLimitRateKey, "20"))
+               common.WithParamsValue(constant.TPSLimitRateKey, "20"),
+               common.WithParamsValue(constant.TPSLimitIntervalKey, "60000"))
 
        mockStrategyImpl := strategy.NewMockTpsLimitStrategy(ctrl)
        mockStrategyImpl.EXPECT().IsAllowable().Return(true).Times(1)

Reply via email to