This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/develop by this push:
new 60bfb76f4 optimize config (#3034)
60bfb76f4 is described below
commit 60bfb76f4fcfbde0b4438f3c112a8348d1a0479c
Author: KamTo Hung <[email protected]>
AuthorDate: Tue Nov 25 09:34:27 2025 +0800
optimize config (#3034)
* fix(protocol): support config priority with global fallback and eliminate
test YAML errors
---------
Co-authored-by: Sirui Huang <[email protected]>
Co-authored-by: Sirui Huang <[email protected]>
Co-authored-by: nanjiek <[email protected]>
Co-authored-by: Alanxtl <[email protected]>
Co-authored-by: Xuetao Li <[email protected]>
---
client/action.go | 3 +
client/client.go | 3 +
client/options.go | 63 ++++++--
common/constant/default.go | 5 +
common/constant/key.go | 6 +
common/extension/registry_directory.go | 14 +-
compat.go | 33 +++-
config/consumer_config.go | 4 -
config_center/mock_dynamic_config.go | 6 +
dubbo.go | 26 ++-
protocol/dubbo3/internal/client.go | 1 +
protocol/dubbo3/internal/server.go | 1 +
protocol/grpc/client.go | 63 +++++---
protocol/grpc/server.go | 47 ++++--
protocol/triple/dubbo3_invoker.go | 6 +
registry/directory/directory_test.go | 5 +-
registry/protocol/protocol.go | 128 +++++++++++----
registry/protocol/protocol_test.go | 117 ++++++++++++--
remoting/getty/config.go | 20 +--
remoting/getty/getty_client.go | 41 ++++-
remoting/getty/getty_client_test.go | 26 ++-
remoting/getty/getty_server.go | 41 ++++-
remoting/getty/getty_server_test.go | 19 ++-
server/action.go | 31 ++--
server/server.go | 174 +++++++++++++++------
tools/protoc-gen-triple-openapi/constant/format.go | 2 +-
26 files changed, 685 insertions(+), 200 deletions(-)
diff --git a/client/action.go b/client/action.go
index 1c85908cf..fd6a3b4d2 100644
--- a/client/action.go
+++ b/client/action.go
@@ -148,7 +148,10 @@ func (refOpts *ReferenceOptions) refer(srv
common.RPCService, info *ClientInfo)
// for new triple non-IDL mode
// TODO: remove ISIDL after old triple removed
common.WithParamsValue(constant.IDLMode, ref.IDLMode),
+ common.WithAttribute(constant.ApplicationKey,
refOpts.Application),
+ common.WithAttribute(constant.ShutdownConfigPrefix,
refOpts.Shutdown),
common.WithAttribute(constant.ConsumerConfigKey,
refOpts.Consumer),
+ common.WithAttribute(constant.ProtocolConfigKey, ref.Protocol),
common.WithAttribute(constant.RegistriesConfigKey,
refOpts.Registries),
)
diff --git a/client/client.go b/client/client.go
index 6fb0c7104..546f79ba0 100644
--- a/client/client.go
+++ b/client/client.go
@@ -163,13 +163,16 @@ func (cli *Client) dial(interfaceName string, info
*ClientInfo, srv any, opts ..
newRefOpts := defaultReferenceOptions()
finalOpts := []ReferenceOption{
setReference(cli.cliOpts.overallReference),
+ setApplication(cli.cliOpts.Application),
setApplicationCompat(cli.cliOpts.applicationCompat),
setRegistriesCompat(cli.cliOpts.registriesCompat),
setRegistries(cli.cliOpts.Registries),
setConsumer(cli.cliOpts.Consumer),
+ setShutdown(cli.cliOpts.Shutdown),
setMetrics(cli.cliOpts.Metrics),
setOtel(cli.cliOpts.Otel),
setTLS(cli.cliOpts.TLS),
+ setProtocols(cli.cliOpts.Protocols),
// this config must be set after Reference initialized
setInterfaceName(interfaceName),
}
diff --git a/client/options.go b/client/options.go
index b6dd1f7ae..60a4f6c3e 100644
--- a/client/options.go
+++ b/client/options.go
@@ -41,12 +41,15 @@ import (
)
type ReferenceOptions struct {
- Reference *global.ReferenceConfig
- Consumer *global.ConsumerConfig
- Metrics *global.MetricsConfig
- Otel *global.OtelConfig
- TLS *global.TLSConfig
- Registries map[string]*global.RegistryConfig
+ Reference *global.ReferenceConfig
+ Consumer *global.ConsumerConfig
+ Application *global.ApplicationConfig
+ Shutdown *global.ShutdownConfig
+ Metrics *global.MetricsConfig
+ Otel *global.OtelConfig
+ TLS *global.TLSConfig
+ Protocols map[string]*global.ProtocolConfig
+ Registries map[string]*global.RegistryConfig
pxy *proxy.Proxy
id string
@@ -62,11 +65,14 @@ type ReferenceOptions struct {
func defaultReferenceOptions() *ReferenceOptions {
return &ReferenceOptions{
- Reference: global.DefaultReferenceConfig(),
- Metrics: global.DefaultMetricsConfig(),
- Otel: global.DefaultOtelConfig(),
- TLS: global.DefaultTLSConfig(),
- Registries: global.DefaultRegistriesConfig(),
+ Reference: global.DefaultReferenceConfig(),
+ Application: global.DefaultApplicationConfig(),
+ Shutdown: global.DefaultShutdownConfig(),
+ Metrics: global.DefaultMetricsConfig(),
+ Otel: global.DefaultOtelConfig(),
+ TLS: global.DefaultTLSConfig(),
+ Protocols: make(map[string]*global.ProtocolConfig),
+ Registries: global.DefaultRegistriesConfig(),
}
}
@@ -504,6 +510,28 @@ func setTLS(tls *global.TLSConfig) ReferenceOption {
}
}
+func setApplication(application *global.ApplicationConfig) ReferenceOption {
+ return func(opts *ReferenceOptions) {
+ opts.Application = application
+ }
+}
+
+// setProtocols sets the protocols configuration for the service reference.
+// This is an internal function used by the framework to configure protocol
settings.
+// It accepts a map of protocol configurations where the key is the protocol
name
+// and the value is the corresponding protocol configuration.
+func setProtocols(protocols map[string]*global.ProtocolConfig) ReferenceOption
{
+ return func(opts *ReferenceOptions) {
+ opts.Protocols = protocols
+ }
+}
+
+func setShutdown(shutdown *global.ShutdownConfig) ReferenceOption {
+ return func(opts *ReferenceOptions) {
+ opts.Shutdown = shutdown
+ }
+}
+
func setRegistries(regs map[string]*global.RegistryConfig) ReferenceOption {
return func(opts *ReferenceOptions) {
opts.Registries = regs
@@ -518,6 +546,7 @@ type ClientOptions struct {
Metrics *global.MetricsConfig
Otel *global.OtelConfig
TLS *global.TLSConfig
+ Protocols map[string]*global.ProtocolConfig
overallReference *global.ReferenceConfig
applicationCompat *config.ApplicationConfig
@@ -933,7 +962,7 @@ func SetClientRegistries(regs
map[string]*global.RegistryConfig) ClientOption {
}
}
-func SetApplication(application *global.ApplicationConfig) ClientOption {
+func SetClientApplication(application *global.ApplicationConfig) ClientOption {
return func(opts *ClientOptions) {
opts.Application = application
}
@@ -969,6 +998,16 @@ func SetClientTLS(tls *global.TLSConfig) ClientOption {
}
}
+// SetClientProtocols sets the protocols configuration for the client.
+// This function is used by the framework to configure protocol settings from
global configuration.
+// It accepts a map of protocol configurations where the key is the protocol
name
+// and the value is the corresponding protocol configuration.
+func SetClientProtocols(protocols map[string]*global.ProtocolConfig)
ClientOption {
+ return func(opts *ClientOptions) {
+ opts.Protocols = protocols
+ }
+}
+
// todo: need to be consistent with MethodConfig
type CallOptions struct {
RequestTimeout string
diff --git a/common/constant/default.go b/common/constant/default.go
index 3b0a4bd6c..c4551a3fc 100644
--- a/common/constant/default.go
+++ b/common/constant/default.go
@@ -115,6 +115,11 @@ const (
// graceful shutdown
DefaultGracefulShutdownTimeout = 10 * time.Second
+
+ // MaxWheelTimeSpan consumer side max wait time for heartbeat-period
+ MaxWheelTimeSpan = 900e9 // 900s, 15 minute
+ // DefaultHeartbeatTimeout consumer default heartbeat timeout
+ DefaultHeartbeatTimeout = 60 * time.Second
)
const (
diff --git a/common/constant/key.go b/common/constant/key.go
index bf61b1a32..56254299f 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -167,6 +167,8 @@ const (
RpcServiceKey = "rpc-service"
ClientInfoKey = "client-info"
TLSConfigKey = "tls-config"
+ ProviderConfigKey = "provider-config"
+ ProtocolConfigKey = "protocol-config"
)
const (
@@ -493,6 +495,10 @@ const (
TripleGoMethodName = "XXX_TRIPLE_GO_METHOD_NAME"
)
+const (
+ RpcServiceInterfaceName = "RPCSERVICE_INTERFACE_NAME"
+)
+
// Weight constants for Nacos instance registration
const (
DefaultNacosWeight = 1.0 // Default weight if not specified or
invalid
diff --git a/common/extension/registry_directory.go
b/common/extension/registry_directory.go
index 32309dea6..d886c6d73 100644
--- a/common/extension/registry_directory.go
+++ b/common/extension/registry_directory.go
@@ -43,22 +43,22 @@ func SetDirectory(key string, v registryDirectory) {
}
// GetDefaultRegistryDirectory finds the registryDirectory with url and
registry
-func GetDefaultRegistryDirectory(config *common.URL, registry
registry.Registry) (directory.Directory, error) {
+func GetDefaultRegistryDirectory(url *common.URL, registry registry.Registry)
(directory.Directory, error) {
if defaultDirectory == nil {
panic("registry directory is not existing, make sure you have
import the package.")
}
- return defaultDirectory(config, registry)
+ return defaultDirectory(url, registry)
}
// GetDirectoryInstance finds the registryDirectory with url and registry
-func GetDirectoryInstance(config *common.URL, registry registry.Registry)
(directory.Directory, error) {
- key := config.Protocol
+func GetDirectoryInstance(url *common.URL, registry registry.Registry)
(directory.Directory, error) {
+ key := url.Protocol
if key == "" {
- return GetDefaultRegistryDirectory(config, registry)
+ return GetDefaultRegistryDirectory(url, registry)
}
if directories[key] == nil {
logger.Warn("registry directory " + key + " does not exist,
make sure you have import the package, will use the default directory type.")
- return GetDefaultRegistryDirectory(config, registry)
+ return GetDefaultRegistryDirectory(url, registry)
}
- return directories[key](config, registry)
+ return directories[key](url, registry)
}
diff --git a/compat.go b/compat.go
index 970f72c41..e61261e9c 100644
--- a/compat.go
+++ b/compat.go
@@ -506,7 +506,7 @@ func compatInstanceOptions(cr *config.RootConfig, rc
*InstanceOptions) {
proCompat := make(map[string]*global.ProtocolConfig)
for k, v := range cr.Protocols {
- proCompat[k] = compatGlobalProtocolConfig(v)
+ proCompat[k] = CompatGlobalProtocolConfig(v)
}
regCompat := make(map[string]*global.RegistryConfig)
@@ -531,7 +531,7 @@ func compatInstanceOptions(cr *config.RootConfig, rc
*InstanceOptions) {
rc.Profiles = compatGlobalProfilesConfig(cr.Profiles)
}
-func compatGlobalProtocolConfig(c *config.ProtocolConfig)
*global.ProtocolConfig {
+func CompatGlobalProtocolConfig(c *config.ProtocolConfig)
*global.ProtocolConfig {
if c == nil {
return nil
}
@@ -546,6 +546,17 @@ func compatGlobalProtocolConfig(c *config.ProtocolConfig)
*global.ProtocolConfig
}
}
+func CompatGlobalProtocolConfigMap(m map[string]*config.ProtocolConfig)
map[string]*global.ProtocolConfig {
+ if m == nil {
+ return nil
+ }
+ protocols := make(map[string]*global.ProtocolConfig, len(m))
+ for k, v := range m {
+ protocols[k] = CompatGlobalProtocolConfig(v)
+ }
+ return protocols
+}
+
// just for compat
func compatGlobalTripleConfig(c *config.TripleConfig) *global.TripleConfig {
if c == nil {
@@ -657,7 +668,7 @@ func compatGlobalProviderConfig(c *config.ProviderConfig)
*global.ProviderConfig
}
services := make(map[string]*global.ServiceConfig)
for key, svc := range c.Services {
- services[key] = compatGlobalServiceConfig(svc)
+ services[key] = CompatGlobalServiceConfig(svc)
}
return &global.ProviderConfig{
Filter: c.Filter,
@@ -674,7 +685,7 @@ func compatGlobalProviderConfig(c *config.ProviderConfig)
*global.ProviderConfig
}
}
-func compatGlobalServiceConfig(c *config.ServiceConfig) *global.ServiceConfig {
+func CompatGlobalServiceConfig(c *config.ServiceConfig) *global.ServiceConfig {
if c == nil {
return nil
}
@@ -684,7 +695,7 @@ func compatGlobalServiceConfig(c *config.ServiceConfig)
*global.ServiceConfig {
}
protocols := make(map[string]*global.ProtocolConfig)
for key, pro := range c.RCProtocolsMap {
- protocols[key] = compatGlobalProtocolConfig(pro)
+ protocols[key] = CompatGlobalProtocolConfig(pro)
}
registries := make(map[string]*global.RegistryConfig)
for key, reg := range c.RCRegistriesMap {
@@ -970,3 +981,15 @@ func compatGlobalProfilesConfig(c *config.ProfilesConfig)
*global.ProfilesConfig
Active: c.Active,
}
}
+
+func CompatGlobalTLSConfig(c *config.TLSConfig) *global.TLSConfig {
+ if c == nil {
+ return nil
+ }
+ return &global.TLSConfig{
+ CACertFile: c.CACertFile,
+ TLSCertFile: c.TLSCertFile,
+ TLSKeyFile: c.TLSKeyFile,
+ TLSServerName: c.TLSServerName,
+ }
+}
diff --git a/config/consumer_config.go b/config/consumer_config.go
index 79f3238c3..80de82278 100644
--- a/config/consumer_config.go
+++ b/config/consumer_config.go
@@ -36,10 +36,6 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
)
-const (
- MaxWheelTimeSpan = 900e9 // 900s, 15 minute
-)
-
// ConsumerConfig is Consumer default configuration
type ConsumerConfig struct {
Filter string
`yaml:"filter" json:"filter,omitempty" property:"filter"`
diff --git a/config_center/mock_dynamic_config.go
b/config_center/mock_dynamic_config.go
index 76ffeea9c..35e78070a 100644
--- a/config_center/mock_dynamic_config.go
+++ b/config_center/mock_dynamic_config.go
@@ -18,6 +18,7 @@
package config_center
import (
+ "strings"
"sync"
)
@@ -149,6 +150,11 @@ func (c *MockDynamicConfiguration) GetInternalProperty(key
string, opts ...Optio
// GetRule gets properties of MockDynamicConfiguration
func (c *MockDynamicConfiguration) GetRule(key string, opts ...Option)
(string, error) {
+ // For configurator-related queries, return empty to avoid YAML parsing
errors
+ // The actual configuration will be provided by
MockServiceConfigEvent/MockApplicationConfigEvent
+ if strings.Contains(key, "configurator") || strings.HasSuffix(key,
"configurators") {
+ return "", nil
+ }
return c.GetProperties(key, opts...)
}
diff --git a/dubbo.go b/dubbo.go
index 93e3ad5a5..2a132d3cb 100644
--- a/dubbo.go
+++ b/dubbo.go
@@ -76,6 +76,7 @@ func (ins *Instance) NewClient(opts ...client.ClientOption)
(*client.Client, err
metricsCfg := ins.insOpts.CloneMetrics()
otelCfg := ins.insOpts.CloneOtel()
tlsCfg := ins.insOpts.CloneTLSConfig()
+ protocolsCfg := ins.insOpts.CloneProtocols()
if conCfg != nil {
if !conCfg.Check {
@@ -92,7 +93,7 @@ func (ins *Instance) NewClient(opts ...client.ClientOption)
(*client.Client, err
)
}
if appCfg != nil {
- cliOpts = append(cliOpts, client.SetApplication(appCfg))
+ cliOpts = append(cliOpts, client.SetClientApplication(appCfg))
}
if regsCfg != nil {
cliOpts = append(cliOpts, client.SetClientRegistries(regsCfg))
@@ -109,6 +110,9 @@ func (ins *Instance) NewClient(opts ...client.ClientOption)
(*client.Client, err
if tlsCfg != nil {
cliOpts = append(cliOpts, client.SetClientTLS(tlsCfg))
}
+ if protocolsCfg != nil {
+ cliOpts = append(cliOpts,
client.SetClientProtocols(protocolsCfg))
+ }
// options passed by users has higher priority
cliOpts = append(cliOpts, opts...)
@@ -134,6 +138,7 @@ func (ins *Instance) NewServer(opts ...server.ServerOption)
(*server.Server, err
metricsCfg := ins.insOpts.CloneMetrics()
otelCfg := ins.insOpts.CloneOtel()
tlsCfg := ins.insOpts.CloneTLSConfig()
+ protocolsCfg := ins.insOpts.CloneProtocols()
if appCfg != nil {
srvOpts = append(srvOpts,
@@ -166,6 +171,9 @@ func (ins *Instance) NewServer(opts ...server.ServerOption)
(*server.Server, err
if tlsCfg != nil {
srvOpts = append(srvOpts, server.SetServerTLS(tlsCfg))
}
+ if protocolsCfg != nil {
+ srvOpts = append(srvOpts,
server.SetServerProtocols(protocolsCfg))
+ }
// options passed by users have higher priority
srvOpts = append(srvOpts, opts...)
@@ -190,11 +198,16 @@ func (ins *Instance) start() (err error) {
return err
}
-// loadProvider loads the service provider.
+// loadProvider loads and initializes the service provider
+// Flow:
+// 1. Configure server options with Provider settings if available
+// 2. Create server instance with configured options
+// 3. Register all defined provider services (both IDL and non-IDL modes)
+// 4. Start server in a separate goroutine to handle incoming requests
func (ins *Instance) loadProvider() error {
var err error
var srvOpts []server.ServerOption
-
+ // Step 1: Build server options - add Provider configuration if exists
if ins.insOpts.Provider != nil {
srvOpts = append(srvOpts,
server.SetServerProvider(ins.insOpts.Provider))
}
@@ -202,20 +215,23 @@ func (ins *Instance) loadProvider() error {
if err != nil {
return err
}
- // register services
+ // Step 2: Register all defined provider services
proLock.RLock()
defer proLock.RUnlock()
for _, definition := range providerServices {
+ // Step 3: Register service based on mode
if definition.Info != nil {
+ // IDL mode: Register with service information
(interface definition available)
err = srv.Register(definition.Handler, definition.Info,
definition.Opts...)
} else {
- // if Info in nil, it means non-idl mode
+ // Non-IDL mode: Register service handler directly (no
interface definition)
err = srv.RegisterService(definition.Handler,
definition.Opts...)
}
if err != nil {
return err
}
}
+ // Step 4: Start server
go func() {
if err = srv.Serve(); err != nil {
logger.Fatalf("Failed to start server, err: %v", err)
diff --git a/protocol/dubbo3/internal/client.go
b/protocol/dubbo3/internal/client.go
index 06f9f045c..69c03db23 100644
--- a/protocol/dubbo3/internal/client.go
+++ b/protocol/dubbo3/internal/client.go
@@ -21,6 +21,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/config"
)
+// TODO: After the config is removed, remove the test
func init() {
// for pb client
config.SetConsumerServiceByInterfaceName("org.apache.dubbo.DubboGreeterImpl",
&GreeterClientImpl{})
diff --git a/protocol/dubbo3/internal/server.go
b/protocol/dubbo3/internal/server.go
index 055bf1b9b..9966635c8 100644
--- a/protocol/dubbo3/internal/server.go
+++ b/protocol/dubbo3/internal/server.go
@@ -45,6 +45,7 @@ func (s *Server) SayHello(ctx context.Context, in
*HelloRequest) (*HelloReply, e
}
// InitDubboServer creates global gRPC server.
+// TODO: After the config is removed, remove the test
func InitDubboServer() {
serviceConfig := config.NewServiceConfigBuilder().
SetInterface("org.apache.dubbo.DubboGreeterImpl").
diff --git a/protocol/grpc/client.go b/protocol/grpc/client.go
index c6b389d6e..644dd45a1 100644
--- a/protocol/grpc/client.go
+++ b/protocol/grpc/client.go
@@ -41,6 +41,7 @@ import (
)
import (
+ "dubbo.apache.org/dubbo-go/v3"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config"
@@ -59,7 +60,9 @@ type Client struct {
// NewClient creates a new gRPC client.
func NewClient(url *common.URL) (*Client, error) {
- clientConfInitOnce.Do(clientInit)
+ clientConfInitOnce.Do(func() {
+ clientInit(url)
+ })
// If global trace instance was set, it means trace function enabled.
// If not, will return NoopTracer.
@@ -139,8 +142,14 @@ func NewClient(url *common.URL) (*Client, error) {
}
key := url.GetParam(constant.InterfaceKey, "")
- impl := config.GetConsumerServiceByInterfaceName(key)
- invoker := getInvoker(impl, conn)
+ //TODO: Temporary compatibility with old APIs, can be removed later
+ consumerService := config.GetConsumerServiceByInterfaceName(key)
+ if consumerService == nil {
+ if rpcService, ok := url.GetAttribute(constant.RpcServiceKey);
ok {
+ consumerService = rpcService
+ }
+ }
+ invoker := getInvoker(consumerService, conn)
return &Client{
ClientConn: conn,
@@ -148,7 +157,7 @@ func NewClient(url *common.URL) (*Client, error) {
}, nil
}
-func clientInit() {
+func clientInit(url *common.URL) {
// load rootConfig from runtime
rootConfig := config.GetRootConfig()
@@ -167,27 +176,41 @@ func clientInit() {
}()
if rootConfig.Application == nil {
- return
+ app := url.GetParam(constant.ApplicationKey, "")
+ if len(app) == 0 {
+ return
+ }
}
- protocolConf := config.GetRootConfig().Protocols
+ //TODO: Temporary compatibility with old APIs, can be removed later
+ protocolConf :=
dubbo.CompatGlobalProtocolConfigMap(rootConfig.Protocols)
if protocolConf == nil {
- logger.Info("protocol_conf default use dubbo config")
- } else {
- grpcConf := protocolConf[GRPC]
- if grpcConf == nil {
- logger.Warnf("grpcConf is nil")
- return
- }
- grpcConfByte, err := yaml.Marshal(grpcConf)
- if err != nil {
- panic(err)
- }
- err = yaml.Unmarshal(grpcConfByte, clientConf)
- if err != nil {
- panic(err)
+ if protocolConfRaw, ok :=
url.GetAttribute(constant.ProtocolConfigKey); ok {
+ protocolConfig, ok :=
protocolConfRaw.(map[string]*global.ProtocolConfig)
+ if !ok {
+ logger.Warnf("protocolConfig assert failed")
+ return
+ }
+ if protocolConfig == nil {
+ logger.Warnf("protocolConfig is nil")
+ return
+ }
+ protocolConf = protocolConfig
}
}
+ grpcConf := protocolConf[GRPC]
+ if grpcConf == nil {
+ logger.Warnf("grpcConf is nil")
+ return
+ }
+ grpcConfByte, err := yaml.Marshal(grpcConf)
+ if err != nil {
+ panic(err)
+ }
+ err = yaml.Unmarshal(grpcConfByte, clientConf)
+ if err != nil {
+ panic(err)
+ }
}
func getInvoker(impl any, conn *grpc.ClientConn) any {
diff --git a/protocol/grpc/server.go b/protocol/grpc/server.go
index 7cab2ff76..635de4e1f 100644
--- a/protocol/grpc/server.go
+++ b/protocol/grpc/server.go
@@ -41,6 +41,7 @@ import (
)
import (
+ "dubbo.apache.org/dubbo-go/v3"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config"
@@ -150,22 +151,37 @@ func (s *Server) Start(url *common.URL) {
s.grpcServer = server
go func() {
- providerServices := config.GetProviderConfig().Services
-
- if len(providerServices) == 0 {
- panic("provider service map is null")
+ providerServices := getProviderServices(url)
+ if providerServices == nil {
+ logger.Error("no provider service found")
+ return
}
- // wait all exporter ready , then set proxy impl and grpc
registerService
+ // wait all exporter ready, then set proxy impl and grpc
registerService
waitGrpcExporter(providerServices)
registerService(providerServices, server)
reflection.Register(server)
-
- if err = server.Serve(lis); err != nil {
+ if err := server.Serve(lis); err != nil {
logger.Errorf("server serve failed with err: %v", err)
}
}()
}
+// getProviderServices retrieves provider services from config or URL
attributes
+func getProviderServices(url *common.URL) map[string]*global.ServiceConfig {
+ providerServices := config.GetProviderConfig().Services
+ if len(providerServices) > 0 {
+ return convertServiceMap(providerServices)
+ }
+ // TODO #2741 old config compatibility
+ if providerConfRaw, ok := url.GetAttribute(constant.ProviderConfigKey);
ok {
+ if providerConf, ok :=
providerConfRaw.(*global.ProviderConfig); ok && providerConf != nil {
+ return providerConf.Services
+ }
+ logger.Error("illegal provider config")
+ }
+ return nil
+}
+
// getSyncMapLen get sync map len
func getSyncMapLen(m *sync.Map) int {
length := 0
@@ -177,8 +193,17 @@ func getSyncMapLen(m *sync.Map) int {
return length
}
+func convertServiceMap(providerServices map[string]*config.ServiceConfig)
map[string]*global.ServiceConfig {
+ result := make(map[string]*global.ServiceConfig)
+ for k, v := range providerServices {
+ result[k] = dubbo.CompatGlobalServiceConfig(v)
+ }
+ return result
+}
+
// waitGrpcExporter wait until len(providerServices) = len(ExporterMap)
-func waitGrpcExporter(providerServices map[string]*config.ServiceConfig) {
+// TODO #2741 old config compatibility
+func waitGrpcExporter(providerServices map[string]*global.ServiceConfig) {
t := time.NewTicker(50 * time.Millisecond)
defer t.Stop()
pLen := len(providerServices)
@@ -199,14 +224,16 @@ func waitGrpcExporter(providerServices
map[string]*config.ServiceConfig) {
}
// registerService SetProxyImpl invoker and grpc service
-func registerService(providerServices map[string]*config.ServiceConfig, server
*grpc.Server) {
+// TODO #2741 old config compatibility
+func registerService(providerServices map[string]*global.ServiceConfig, server
*grpc.Server) {
for key, providerService := range providerServices {
+
+ //TODO: Temporary compatibility with old APIs, can be removed
later
service := config.GetProviderService(key)
ds, ok := service.(DubboGrpcService)
if !ok {
panic("illegal service type registered")
}
-
serviceKey := common.ServiceKey(providerService.Interface,
providerService.Group, providerService.Version)
exporter, _ := grpcProtocol.ExporterMap().Load(serviceKey)
if exporter == nil {
diff --git a/protocol/triple/dubbo3_invoker.go
b/protocol/triple/dubbo3_invoker.go
index 262d6f91f..b84176138 100644
--- a/protocol/triple/dubbo3_invoker.go
+++ b/protocol/triple/dubbo3_invoker.go
@@ -80,7 +80,13 @@ func NewDubbo3Invoker(url *common.URL) (*DubboInvoker,
error) {
// for triple pb serialization. The bean name from provider is the
provider reference key,
// which can't locate the target consumer stub, so we use interface
key..
interfaceKey := url.GetParam(constant.InterfaceKey, "")
+ //TODO: Temporary compatibility with old APIs, can be removed later
consumerService :=
config.GetConsumerServiceByInterfaceName(interfaceKey)
+ if consumerService == nil {
+ if rpcService, ok := url.GetAttribute(constant.RpcServiceKey);
ok {
+ consumerService = rpcService
+ }
+ }
dubboSerializerType := url.GetParam(constant.SerializationKey,
constant.ProtobufSerialization)
triCodecType := tripleConstant.CodecType(dubboSerializerType)
diff --git a/registry/directory/directory_test.go
b/registry/directory/directory_test.go
index 22cf34e09..4856d2597 100644
--- a/registry/directory/directory_test.go
+++ b/registry/directory/directory_test.go
@@ -157,14 +157,13 @@ func normalRegistryDir(noMockEvent ...bool)
(*RegistryDirectory, *registry.MockR
Name: "test-application",
}
- url, _ := common.NewURL("mock://127.0.0.1:1111",
- common.WithAttribute(constant.ApplicationKey,
applicationConfig),
- )
+ url, _ := common.NewURL("mock://127.0.0.1:1111")
suburl, _ := common.NewURL(
"dubbo://127.0.0.1:20000/org.apache.dubbo-go.mockService",
common.WithParamsValue(constant.ClusterKey, "mock"),
common.WithParamsValue(constant.GroupKey, "group"),
common.WithParamsValue(constant.VersionKey, "1.0.0"),
+ common.WithParamsValue(constant.ApplicationKey,
applicationConfig.Name),
)
url.SubURL = suburl
mockRegistry, _ := registry.NewMockRegistry(&common.URL{})
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index 5611b3d8e..c9b52d550 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -38,6 +38,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
_ "dubbo.apache.org/dubbo-go/v3/config_center/configurator"
+ "dubbo.apache.org/dubbo-go/v3/global"
"dubbo.apache.org/dubbo-go/v3/protocol/base"
"dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper"
"dubbo.apache.org/dubbo-go/v3/protocol/result"
@@ -122,10 +123,10 @@ func filterHideKey(url *common.URL) *common.URL {
return url.CloneExceptParams(removeSet)
}
-func (proto *registryProtocol) initConfigurationListeners() {
+func (proto *registryProtocol) initConfigurationListeners(url *common.URL) {
proto.overrideListeners = &sync.Map{}
proto.serviceConfigurationListeners = &sync.Map{}
- proto.providerConfigurationListener =
newProviderConfigurationListener(proto.overrideListeners)
+ proto.providerConfigurationListener =
newProviderConfigurationListener(proto.overrideListeners, url)
}
// GetRegistries returns all underlying registry instances.
@@ -185,12 +186,33 @@ func (proto *registryProtocol) Refer(url *common.URL)
base.Invoker {
// Export provider service to registry center
func (proto *registryProtocol) Export(originInvoker base.Invoker)
base.Exporter {
- proto.once.Do(func() {
- proto.initConfigurationListeners()
- })
registryUrl := getRegistryUrl(originInvoker)
providerUrl := getProviderUrl(originInvoker)
+ // Copy ShutdownConfig from providerUrl to registryUrl if registryUrl
doesn't have it
+ // (server layer sets it in ivkURL, which becomes providerUrl here)
+ if _, ok := registryUrl.GetAttribute(constant.ShutdownConfigPrefix);
!ok {
+ if config.GetShutDown() == nil {
+ // Fallback to default if config package doesn't have
one
+ registryUrl.SetAttribute(constant.ShutdownConfigPrefix,
global.DefaultShutdownConfig())
+ }
+ }
+
+ // Copy ApplicationKey from providerUrl to registryUrl if registryUrl
doesn't have it
+ // ApplicationKey is passed as URL parameter (application name string)
+ // (server layer sets it in ivkURL, which becomes providerUrl here)
+ if _, ok := registryUrl.GetAttribute(constant.ApplicationKey); !ok {
+ // Fallback to config package for old API compatibility
+ if config.GetRootConfig().Application == nil {
+ // Use default application name
+ registryUrl.SetAttribute(constant.ApplicationKey,
global.DefaultApplicationConfig())
+ }
+ }
+
+ proto.once.Do(func() {
+ proto.initConfigurationListeners(providerUrl)
+ })
+
overriderUrl := getSubscribedOverrideUrl(providerUrl)
// Deprecated! subscribe to override rules in 2.6.x or before.
overrideSubscribeListener := newOverrideSubscribeListener(overriderUrl,
originInvoker, proto)
@@ -269,26 +291,49 @@ func registerServiceMap(invoker base.Invoker) error {
// such as
dubbo://:20000/org.apache.dubbo.UserProvider?bean.name=UserProvider&cluster=failfast...
id := providerUrl.GetParam(constant.BeanNameKey, "")
- serviceConfig := config.GetProviderConfig().Services[id]
- if serviceConfig == nil {
- s := "reExport can not get serviceConfig"
- return perrors.New(s)
- }
- rpcService := config.GetProviderService(id)
- if rpcService == nil {
- s := "reExport can not get RPCService"
- return perrors.New(s)
+ //TODO: Temporary compatibility with old APIs, can be removed later
+
+ providerConfig := config.GetProviderConfig()
+
+ if providerConfig != nil {
+ if serviceConfig := providerConfig.Services[id]; serviceConfig
!= nil {
+ rpcService := config.GetProviderService(id)
+ if rpcService == nil {
+ return perrors.New("reExport can not get
RPCService")
+ }
+
+ _, err :=
common.ServiceMap.Register(serviceConfig.Interface,
+ serviceConfig.ProtocolIDs[0],
serviceConfig.Group,
+ serviceConfig.Version, rpcService)
+ if err != nil {
+ s := "reExport can not re register ServiceMap.
Error message is " + err.Error()
+ return perrors.New(s)
+ }
+ return nil
+ }
}
- _, err := common.ServiceMap.Register(serviceConfig.Interface,
- // FIXME
- serviceConfig.ProtocolIDs[0], serviceConfig.Group,
- serviceConfig.Version, rpcService)
- if err != nil {
- s := "reExport can not re register ServiceMap. Error message is
" + err.Error()
- return perrors.New(s)
+ if providerConfRaw, ok :=
providerUrl.GetAttribute(constant.ProviderConfigKey); ok {
+ if providerConf, ok :=
providerConfRaw.(*global.ProviderConfig); ok {
+ if serviceConf, ok := providerConf.Services[id]; ok {
+ if serviceConf == nil {
+ return perrors.New("reExport can not
get RPCService")
+ }
+ if rpcService, ok :=
providerUrl.GetAttribute(constant.RpcServiceKey); ok {
+ _, err :=
common.ServiceMap.Register(serviceConf.Interface,
+ serviceConf.ProtocolIDs[0],
serviceConf.Group,
+ serviceConf.Version, rpcService)
+ if err != nil {
+ s := "reExport can not re
register ServiceMap. Error message is " + err.Error()
+ return perrors.New(s)
+ }
+ return nil
+ }
+ }
+ }
}
- return nil
+
+ return perrors.New("reExport can not get serviceConfig of config")
}
type overrideSubscribeListener struct {
@@ -424,9 +469,24 @@ func (proto *registryProtocol) Destroy() {
// close all protocol server after consumerUpdateWait +
stepTimeout(max time wait during
// waitAndAcceptNewRequests procedure)
go func() {
- <-time.After(config.GetShutDown().GetStepTimeout() +
config.GetShutDown().GetConsumerUpdateWaitTime())
- exporter.UnExport()
- proto.bounds.Delete(key)
+ if configShutdown := config.GetShutDown();
configShutdown != nil {
+ <-time.After(configShutdown.GetStepTimeout() +
configShutdown.GetConsumerUpdateWaitTime())
+ exporter.UnExport()
+ proto.bounds.Delete(key)
+ return
+ }
+
+ if shutdownConfRaw, ok :=
exporter.registerUrl.GetAttribute(constant.ShutdownConfigPrefix); ok {
+ if shutdownConfig, ok :=
shutdownConfRaw.(*global.ShutdownConfig); ok {
+ stepTimeout, _ :=
time.ParseDuration(shutdownConfig.StepTimeout)
+ consumerUpdateWaitTime, _ :=
time.ParseDuration(shutdownConfig.ConsumerUpdateWaitTime)
+ <-time.After(stepTimeout +
consumerUpdateWaitTime)
+ exporter.UnExport()
+ proto.bounds.Delete(key)
+ return
+ }
+ }
+
}()
return true
})
@@ -518,14 +578,28 @@ type providerConfigurationListener struct {
overrideListeners *sync.Map
}
-func newProviderConfigurationListener(overrideListeners *sync.Map)
*providerConfigurationListener {
+func newProviderConfigurationListener(overrideListeners *sync.Map, url
*common.URL) *providerConfigurationListener {
listener := &providerConfigurationListener{}
listener.overrideListeners = overrideListeners
+
+ // TODO: Temporary compatibility with old APIs, can be removed later
+ application := config.GetRootConfig().Application
listener.InitWith(
-
config.GetRootConfig().Application.Name+constant.ConfiguratorSuffix,
+ application.Name+constant.ConfiguratorSuffix,
listener,
extension.GetDefaultConfiguratorFunc(),
)
+
+ if ApplicationConfRaw, ok := url.GetAttribute(constant.ApplicationKey);
ok {
+ if ApplicationConfig, ok :=
ApplicationConfRaw.(*global.ApplicationConfig); ok {
+ listener.InitWith(
+
ApplicationConfig.Name+constant.ConfiguratorSuffix,
+ listener,
+ extension.GetDefaultConfiguratorFunc(),
+ )
+ }
+ }
+
return listener
}
diff --git a/registry/protocol/protocol_test.go
b/registry/protocol/protocol_test.go
index 0e4f8017d..d2776431d 100644
--- a/registry/protocol/protocol_test.go
+++ b/registry/protocol/protocol_test.go
@@ -34,9 +34,9 @@ import (
common_cfg "dubbo.apache.org/dubbo-go/v3/common/config"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
- "dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/config_center/configurator"
+ "dubbo.apache.org/dubbo-go/v3/global"
"dubbo.apache.org/dubbo-go/v3/protocol/base"
"dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper"
"dubbo.apache.org/dubbo-go/v3/registry"
@@ -44,13 +44,6 @@ import (
"dubbo.apache.org/dubbo-go/v3/remoting"
)
-func init() {
- config.SetRootConfig(config.RootConfig{
- Application: &config.ApplicationConfig{Name:
"test-application"},
- Shutdown: &config.ShutdownConfig{StepTimeout: "0s"},
- })
-}
-
func referNormal(t *testing.T, regProtocol *registryProtocol) {
extension.SetProtocol("registry", GetProtocol)
extension.SetRegistry("mock", registry.NewMockRegistry)
@@ -58,10 +51,22 @@ func referNormal(t *testing.T, regProtocol
*registryProtocol) {
extension.SetCluster("mock", cluster.NewMockCluster)
extension.SetDirectory("mock", directory.NewRegistryDirectory)
- url, _ := common.NewURL("mock://127.0.0.1:1111")
+ shutdownConfig := &global.ShutdownConfig{
+ StepTimeout: "4s",
+ ConsumerUpdateWaitTime: "4s",
+ }
+
+ applicationConfig := &global.ApplicationConfig{
+ Name: "test-application",
+ }
+
+ url, _ := common.NewURL("mock://127.0.0.1:1111",
+ common.WithAttribute(constant.ShutdownConfigPrefix,
shutdownConfig),
+ )
suburl, _ := common.NewURL(
"dubbo://127.0.0.1:20000//",
common.WithParamsValue(constant.ClusterKey, "mock"),
+ common.WithParamsValue(constant.ApplicationKey,
applicationConfig.Name),
)
url.SubURL = suburl
@@ -72,9 +77,6 @@ func referNormal(t *testing.T, regProtocol *registryProtocol)
{
}
func TestRefer(t *testing.T) {
- config.SetRootConfig(config.RootConfig{
- Application: &config.ApplicationConfig{Name:
"test-application"},
- })
regProtocol := newRegistryProtocol()
referNormal(t, regProtocol)
}
@@ -124,12 +126,48 @@ func exporterNormal(t *testing.T, regProtocol
*registryProtocol) *common.URL {
extension.SetProtocol("registry", GetProtocol)
extension.SetRegistry("mock", registry.NewMockRegistry)
extension.SetProtocol(protocolwrapper.FILTER,
protocolwrapper.NewMockProtocolFilter)
- url, _ := common.NewURL("mock://127.0.0.1:1111")
+
+ // Clear ServiceMap to avoid "service already defined" errors between
tests
+ common.ServiceMap.UnRegister("org.apache.dubbo-go.mockService",
"dubbo", "group/org.apache.dubbo-go.mockService:1.0.0")
+
+ shutdownConfig := &global.ShutdownConfig{
+ StepTimeout: "4s",
+ ConsumerUpdateWaitTime: "4s",
+ }
+
+ applicationConfig := &global.ApplicationConfig{
+ Name: "test-application",
+ }
+
+ // Create service config for registerServiceMap
+ serviceConfig := &global.ServiceConfig{
+ Interface: "org.apache.dubbo-go.mockService",
+ ProtocolIDs: []string{"dubbo"},
+ Group: "group",
+ Version: "1.0.0",
+ }
+
+ providerConfig := &global.ProviderConfig{
+ Services: map[string]*global.ServiceConfig{
+ "org.apache.dubbo-go.mockService": serviceConfig,
+ },
+ }
+
+ // Create a mock RPCService with exported methods
+ mockRPCService := &MockRPCService{}
+
+ url, _ := common.NewURL("mock://127.0.0.1:1111",
+ common.WithAttribute(constant.ShutdownConfigPrefix,
shutdownConfig),
+ )
suburl, _ := common.NewURL(
"dubbo://127.0.0.1:20000/org.apache.dubbo-go.mockService",
common.WithParamsValue(constant.ClusterKey, "mock"),
common.WithParamsValue(constant.GroupKey, "group"),
common.WithParamsValue(constant.VersionKey, "1.0.0"),
+ common.WithParamsValue(constant.BeanNameKey,
"org.apache.dubbo-go.mockService"),
+ common.WithAttribute(constant.ApplicationKey,
applicationConfig),
+ common.WithAttribute(constant.ProviderConfigPrefix,
providerConfig),
+ common.WithAttribute(constant.RpcServiceKey, mockRPCService),
)
url.SubURL = suburl
@@ -179,12 +217,45 @@ func TestOneRegAndProtoExporter(t *testing.T) {
regProtocol := newRegistryProtocol()
exporterNormal(t, regProtocol)
- url2, _ := common.NewURL("mock://127.0.0.1:1111")
+ // The test expects that exporting the same service to the same registry
+ // should reuse the same bound (not create a new one)
+ // So we export the exact same service again
+ shutdownConfig := &global.ShutdownConfig{
+ StepTimeout: "4s",
+ ConsumerUpdateWaitTime: "4s",
+ }
+
+ applicationConfig := &global.ApplicationConfig{
+ Name: "test-application",
+ }
+
+ serviceConfig := &global.ServiceConfig{
+ Interface: "org.apache.dubbo-go.mockService",
+ ProtocolIDs: []string{"dubbo"},
+ Group: "group",
+ Version: "1.0.0",
+ }
+
+ providerConfig := &global.ProviderConfig{
+ Services: map[string]*global.ServiceConfig{
+ "org.apache.dubbo-go.mockService": serviceConfig,
+ },
+ }
+
+ mockRPCService := &MockRPCService{}
+
+ url2, _ := common.NewURL("mock://127.0.0.1:1111",
+ common.WithAttribute(constant.ShutdownConfigPrefix,
shutdownConfig),
+ )
suburl2, _ := common.NewURL(
"dubbo://127.0.0.1:20000/org.apache.dubbo-go.mockService",
common.WithParamsValue(constant.ClusterKey, "mock"),
common.WithParamsValue(constant.GroupKey, "group"),
common.WithParamsValue(constant.VersionKey, "1.0.0"),
+ common.WithParamsValue(constant.BeanNameKey,
"org.apache.dubbo-go.mockService"),
+ common.WithAttribute(constant.ApplicationKey,
applicationConfig),
+ common.WithAttribute(constant.ProviderConfigPrefix,
providerConfig),
+ common.WithAttribute(constant.RpcServiceKey, mockRPCService),
)
url2.SubURL = suburl2
@@ -203,6 +274,7 @@ func TestOneRegAndProtoExporter(t *testing.T) {
count2++
return true
})
+ // Should still be 1 because we're exporting the same service (same
cache key)
assert.Equal(t, count2, 1)
}
@@ -251,7 +323,9 @@ func TestExportWithServiceConfig(t *testing.T) {
extension.SetDefaultConfigurator(configurator.NewMockConfigurator)
ccUrl, _ := common.NewURL("mock://127.0.0.1:1111")
dc, _ :=
(&config_center.MockDynamicConfigurationFactory{}).GetDynamicConfiguration(ccUrl)
+ // Use common/config (not dubbo.apache.org/dubbo-go/v3/config)
common_cfg.GetEnvInstance().SetDynamicConfiguration(dc)
+
regProtocol := newRegistryProtocol()
url := exporterNormal(t, regProtocol)
if _, loaded := regProtocol.registries.Load(url.PrimitiveURL); !loaded {
@@ -274,7 +348,9 @@ func TestExportWithApplicationConfig(t *testing.T) {
extension.SetDefaultConfigurator(configurator.NewMockConfigurator)
ccUrl, _ := common.NewURL("mock://127.0.0.1:1111")
dc, _ :=
(&config_center.MockDynamicConfigurationFactory{}).GetDynamicConfiguration(ccUrl)
+ // Use common/config (not dubbo.apache.org/dubbo-go/v3/config)
common_cfg.GetEnvInstance().SetDynamicConfiguration(dc)
+
regProtocol := newRegistryProtocol()
url := exporterNormal(t, regProtocol)
if _, loaded := regProtocol.registries.Load(url.PrimitiveURL); !loaded {
@@ -298,3 +374,16 @@ func TestGetProviderUrlWithHideKey(t *testing.T) {
assert.NotContains(t, providerUrl.GetParams(), ".d")
assert.Contains(t, providerUrl.GetParams(), "a")
}
+
+// MockRPCService is a mock RPC service for testing
+type MockRPCService struct{}
+
+// MockMethod is a mock method that satisfies RPC method requirements
+func (m *MockRPCService) MockMethod(arg1, arg2 string) error {
+ return nil
+}
+
+// Reference returns the reference path
+func (m *MockRPCService) Reference() string {
+ return "org.apache.dubbo-go.mockService"
+}
diff --git a/remoting/getty/config.go b/remoting/getty/config.go
index 992af254c..bc43de779 100644
--- a/remoting/getty/config.go
+++ b/remoting/getty/config.go
@@ -28,7 +28,7 @@ import (
)
import (
- "dubbo.apache.org/dubbo-go/v3/config"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
)
const (
@@ -214,13 +214,13 @@ func (c *ClientConfig) CheckValidity() error {
return perrors.WithMessagef(err,
"time.ParseDuration(HeartbeatPeroid{%#v})", c.HeartbeatPeriod)
}
- if c.heartbeatPeriod >= time.Duration(config.MaxWheelTimeSpan) {
+ if c.heartbeatPeriod >= time.Duration(constant.MaxWheelTimeSpan) {
return perrors.WithMessagef(err, "heartbeat-period %s should be
less than %s",
- c.HeartbeatPeriod,
time.Duration(config.MaxWheelTimeSpan))
+ c.HeartbeatPeriod,
time.Duration(constant.MaxWheelTimeSpan))
}
if len(c.HeartbeatTimeout) == 0 {
- c.heartbeatTimeout = 60 * time.Second
+ c.heartbeatTimeout = constant.DefaultHeartbeatTimeout
} else if c.heartbeatTimeout, err =
time.ParseDuration(c.HeartbeatTimeout); err != nil {
return perrors.WithMessagef(err,
"time.ParseDuration(HeartbeatTimeout{%#v})", c.HeartbeatTimeout)
}
@@ -237,18 +237,18 @@ func (c *ServerConfig) CheckValidity() error {
var err error
if len(c.HeartbeatPeriod) == 0 {
- c.heartbeatPeriod = 60 * time.Second
+ c.heartbeatPeriod = constant.DefaultHeartbeatTimeout
} else if c.heartbeatPeriod, err =
time.ParseDuration(c.HeartbeatPeriod); err != nil {
return perrors.WithMessagef(err,
"time.ParseDuration(HeartbeatPeroid{%#v})", c.HeartbeatPeriod)
}
- if c.heartbeatPeriod >= time.Duration(config.MaxWheelTimeSpan) {
+ if c.heartbeatPeriod >= time.Duration(constant.MaxWheelTimeSpan) {
return perrors.WithMessagef(err, "heartbeat-period %s should be
less than %s",
- c.HeartbeatPeriod,
time.Duration(config.MaxWheelTimeSpan))
+ c.HeartbeatPeriod,
time.Duration(constant.MaxWheelTimeSpan))
}
if len(c.HeartbeatTimeout) == 0 {
- c.heartbeatTimeout = 60 * time.Second
+ c.heartbeatTimeout = constant.DefaultHeartbeatTimeout
} else if c.heartbeatTimeout, err =
time.ParseDuration(c.HeartbeatTimeout); err != nil {
return perrors.WithMessagef(err,
"time.ParseDuration(HeartbeatTimeout{%#v})", c.HeartbeatTimeout)
}
@@ -257,9 +257,9 @@ func (c *ServerConfig) CheckValidity() error {
return perrors.WithMessagef(err,
"time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
}
- if c.sessionTimeout >= time.Duration(config.MaxWheelTimeSpan) {
+ if c.sessionTimeout >= time.Duration(constant.MaxWheelTimeSpan) {
return perrors.WithMessagef(err, "session-timeout %s should be
less than %s",
- c.SessionTimeout,
time.Duration(config.MaxWheelTimeSpan))
+ c.SessionTimeout,
time.Duration(constant.MaxWheelTimeSpan))
}
return perrors.WithStack(c.GettySessionParam.CheckValidity())
diff --git a/remoting/getty/getty_client.go b/remoting/getty/getty_client.go
index 9fe25b748..bbbe743ff 100644
--- a/remoting/getty/getty_client.go
+++ b/remoting/getty/getty_client.go
@@ -38,6 +38,7 @@ import (
)
import (
+ "dubbo.apache.org/dubbo-go/v3"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config"
@@ -64,20 +65,52 @@ func initClient(url *common.URL) {
// load client config from rootConfig.Protocols
// default use dubbo
- if config.GetApplicationConfig() == nil {
+ // TODO: Temporary compatibility with old APIs, can be removed later
+ if url.GetParam(constant.ApplicationKey, "") == "" &&
config.GetApplicationConfig() == nil {
return
}
- if config.GetRootConfig().Protocols == nil {
+
+ // TODO: Temporary compatibility with old APIs, can be removed later
+ if url.GetParam(constant.ProtocolKey, "") == "" &&
config.GetRootConfig().Protocols == nil {
return
}
- protocolConf := config.GetRootConfig().Protocols[url.Protocol]
+ // TODO: Temporary compatibility with old APIs, can be removed later
+ protocolConfMap :=
dubbo.CompatGlobalProtocolConfigMap(config.GetRootConfig().Protocols)
+ if protocolConfMap == nil {
+ if protocolConfRaw, ok :=
url.GetAttribute(constant.ProtocolConfigKey); ok {
+ protocolConfig, ok :=
protocolConfRaw.(map[string]*global.ProtocolConfig)
+ if !ok {
+ logger.Warnf("protocolConfig assert failed")
+ return
+ }
+ if protocolConfig == nil {
+ logger.Warnf("protocolConfig is nil")
+ return
+ }
+ protocolConfMap = protocolConfig
+ }
+ }
+
+ protocolConf := protocolConfMap[url.Protocol]
if protocolConf == nil {
logger.Info("use default getty client config")
return
} else {
//client tls config
- tlsConfig := config.GetRootConfig().TLSConfig
+ tlsConfig :=
dubbo.CompatGlobalTLSConfig(config.GetRootConfig().TLSConfig)
+
+ if tlsConfig == nil {
+ if tlsConfRaw, ok :=
url.GetAttribute(constant.TLSConfigKey); ok {
+ tlsConf, ok := tlsConfRaw.(*global.TLSConfig)
+ if !ok {
+ logger.Errorf("Getty client initialized
the TLSConfig configuration failed")
+ return
+ }
+ tlsConfig = tlsConf
+ }
+ }
+
if tlsConfig != nil {
clientConf.SSLEnabled = true
clientConf.TLSBuilder = &getty.ClientTlsConfigBuilder{
diff --git a/remoting/getty/getty_client_test.go
b/remoting/getty/getty_client_test.go
index 574ab6ad1..0667d695c 100644
--- a/remoting/getty/getty_client_test.go
+++ b/remoting/getty/getty_client_test.go
@@ -36,8 +36,9 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common"
- . "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config"
+ "dubbo.apache.org/dubbo-go/v3/global"
"dubbo.apache.org/dubbo-go/v3/protocol/base"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
"dubbo.apache.org/dubbo-go/v3/protocol/result"
@@ -58,7 +59,7 @@ func testRequestOneWay(t *testing.T, client *Client) {
request := remoting.NewRequest("2.0.2")
invocation := createInvocation("GetUser", nil, nil, []any{"1",
"username"},
[]reflect.Value{reflect.ValueOf("1"),
reflect.ValueOf("username")})
- attachment := map[string]string{InterfaceKey:
"com.ikurento.user.UserProvider"}
+ attachment := map[string]string{constant.InterfaceKey:
"com.ikurento.user.UserProvider"}
setAttachment(invocation, attachment)
request.Data = invocation
request.Event = false
@@ -97,7 +98,7 @@ func testClient_AsyncCall(t *testing.T, client *Client) {
request := remoting.NewRequest("2.0.2")
invocation := createInvocation("GetUser0", nil, nil, []any{"4", nil,
"username"},
[]reflect.Value{reflect.ValueOf("4"), reflect.ValueOf(nil),
reflect.ValueOf("username")})
- attachment := map[string]string{InterfaceKey:
"com.ikurento.user.UserProvider"}
+ attachment := map[string]string{constant.InterfaceKey:
"com.ikurento.user.UserProvider"}
setAttachment(invocation, attachment)
request.Data = invocation
request.Event = false
@@ -271,7 +272,8 @@ func (u User) JavaClassName() string {
return "com.ikurento.user.User"
}
-func TestInitClient(t *testing.T) {
+// TODO: Temporary compatibility with old APIs, can be removed later
+func TestInitClientOldApi(t *testing.T) {
originRootConf := config.GetRootConfig()
rootConf := config.RootConfig{
Protocols: map[string]*config.ProtocolConfig{
@@ -285,7 +287,21 @@ func TestInitClient(t *testing.T) {
config.SetRootConfig(rootConf)
url, err := common.NewURL("dubbo://127.0.0.1:20003/test")
assert.Nil(t, err)
- initServer(url)
+ initClient(url)
config.SetRootConfig(*originRootConf)
assert.NotNil(t, srvConf)
}
+
+func TestInitClient(t *testing.T) {
+ url, err := common.NewURL("dubbo://127.0.0.1:20003/test")
+ assert.Nil(t, err)
+ url.SetAttribute(constant.ProtocolConfigKey,
map[string]*global.ProtocolConfig{
+ "dubbo": {
+ Name: "dubbo",
+ Ip: "127.0.0.1",
+ Port: "20003",
+ },
+ })
+ url.SetAttribute(constant.ApplicationKey, global.ApplicationConfig{})
+ initClient(url)
+}
diff --git a/remoting/getty/getty_server.go b/remoting/getty/getty_server.go
index 85df629e9..49cddadff 100644
--- a/remoting/getty/getty_server.go
+++ b/remoting/getty/getty_server.go
@@ -35,6 +35,7 @@ import (
)
import (
+ "dubbo.apache.org/dubbo-go/v3"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config"
@@ -56,20 +57,52 @@ func initServer(url *common.URL) {
// load server config from rootConfig.Protocols
// default use dubbo
- if config.GetApplicationConfig() == nil {
+ // TODO: Temporary compatibility with old APIs, can be removed later
+ if url.GetParam(constant.ApplicationKey, "") == "" &&
config.GetApplicationConfig() == nil {
return
}
- if config.GetRootConfig().Protocols == nil {
+
+ // TODO: Temporary compatibility with old APIs, can be removed later
+ if url.GetParam(constant.ProtocolKey, "") == "" &&
config.GetRootConfig().Protocols == nil {
return
}
- protocolConf := config.GetRootConfig().Protocols[url.Protocol]
+ //TODO: Temporary compatibility with old APIs, can be removed later
+ protocolConfMap :=
dubbo.CompatGlobalProtocolConfigMap(config.GetRootConfig().Protocols)
+ if protocolConfMap == nil {
+ if protocolConfRaw, ok :=
url.GetAttribute(constant.ProtocolConfigKey); ok {
+ protocolConfig, ok :=
protocolConfRaw.(map[string]*global.ProtocolConfig)
+ if !ok {
+ logger.Warnf("protocolConfig assert failed")
+ return
+ }
+ if protocolConfig == nil {
+ logger.Warnf("protocolConfig is nil")
+ return
+ }
+ protocolConfMap = protocolConfig
+ }
+ }
+
+ protocolConf := protocolConfMap[url.Protocol]
if protocolConf == nil {
logger.Debug("use default getty server config")
return
} else {
//server tls config
- tlsConfig := config.GetRootConfig().TLSConfig
+ tlsConfig :=
dubbo.CompatGlobalTLSConfig(config.GetRootConfig().TLSConfig)
+
+ if tlsConfig == nil {
+ if tlsConfRaw, ok :=
url.GetAttribute(constant.TLSConfigKey); ok {
+ tlsConf, ok := tlsConfRaw.(*global.TLSConfig)
+ if !ok {
+ logger.Errorf("Getty client initialized
the TLSConfig configuration failed")
+ return
+ }
+ tlsConfig = tlsConf
+ }
+ }
+
if tlsConfig != nil {
srvConf.SSLEnabled = true
srvConf.TLSBuilder = &getty.ServerTlsConfigBuilder{
diff --git a/remoting/getty/getty_server_test.go
b/remoting/getty/getty_server_test.go
index d51da99a7..3eebf7033 100644
--- a/remoting/getty/getty_server_test.go
+++ b/remoting/getty/getty_server_test.go
@@ -27,10 +27,13 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config"
+ "dubbo.apache.org/dubbo-go/v3/global"
)
-func TestInitServer(t *testing.T) {
+// TODO: Temporary compatibility with old APIs, can be removed later
+func TestInitServerOldApi(t *testing.T) {
originRootConf := config.GetRootConfig()
rootConf := config.RootConfig{
Protocols: map[string]*config.ProtocolConfig{
@@ -48,3 +51,17 @@ func TestInitServer(t *testing.T) {
config.SetRootConfig(*originRootConf)
assert.NotNil(t, srvConf)
}
+
+func TestInitServer(t *testing.T) {
+ url, err := common.NewURL("dubbo://127.0.0.1:20003/test")
+ assert.Nil(t, err)
+ url.SetAttribute(constant.ProtocolConfigKey,
map[string]*global.ProtocolConfig{
+ "dubbo": {
+ Name: "dubbo",
+ Ip: "127.0.0.1",
+ Port: "20003",
+ },
+ })
+ url.SetAttribute(constant.ApplicationKey, global.ApplicationConfig{})
+ initServer(url)
+}
diff --git a/server/action.go b/server/action.go
index be33fb819..d9942c90b 100644
--- a/server/action.go
+++ b/server/action.go
@@ -124,24 +124,9 @@ func getRandomPort(protocolConfigs
[]*global.ProtocolConfig) *list.List {
return ports
}
-func (svcOpts *ServiceOptions) ExportWithoutInfo() error {
- return svcOpts.export(nil)
-}
-
-func (svcOpts *ServiceOptions) ExportWithInfo(info *common.ServiceInfo) error {
- return svcOpts.export(info)
-}
-
-func (svcOpts *ServiceOptions) export(info *common.ServiceInfo) error {
+func (svcOpts *ServiceOptions) Export() error {
+ info := svcOpts.info
svcConf := svcOpts.Service
- if info != nil {
- if svcConf.Interface == "" {
- svcConf.Interface = info.InterfaceName
- }
- svcOpts.info = info
- }
-
- svcOpts.Id = common.GetReference(svcOpts.rpcService)
// TODO: delay needExport
if svcOpts.unexported != nil && svcOpts.unexported.Load() {
@@ -220,6 +205,18 @@ func (svcOpts *ServiceOptions) export(info
*common.ServiceInfo) error {
// TODO: remove IDL value when version 4.0.0
common.WithParamsValue(constant.IDLMode, isIDL),
+
+ // application name
+ common.WithAttribute(constant.ApplicationKey,
svcOpts.Application),
+
+ // shutdown config
+ common.WithAttribute(constant.ShutdownConfigPrefix,
svcOpts.srvOpts.Shutdown),
+
+ // provider info
+ common.WithAttribute(constant.ProviderConfigKey,
svcOpts.srvOpts.Provider),
+
+ // protocol conf
+ common.WithAttribute(constant.ProtocolConfigKey,
svcOpts.Protocols),
)
if info != nil {
diff --git a/server/server.go b/server/server.go
index 1d531f1da..f2e19bbe2 100644
--- a/server/server.go
+++ b/server/server.go
@@ -39,13 +39,21 @@ import (
)
// proServices are for internal services
-var proServices = make([]*InternalService, 0, 16)
-var proLock sync.Mutex
+var internalProServices = make([]*InternalService, 0, 16)
+var internalProLock sync.Mutex
type Server struct {
cfg *ServerOptions
- svcOptsMap sync.Map
+ mu sync.RWMutex
+ // key: *ServiceOptions, value: *common.ServiceInfo
+ //proServices map[string]common.RPCService
+ // change any to *common.ServiceInfo @see config/service.go
+ svcOptsMap map[string]*ServiceOptions
+ // key is interface name, value is *ServiceOptions
+ interfaceNameServices map[string]*ServiceOptions
+ // indicate whether the server is already started
+ serve bool
}
// ServiceInfo Deprecated: common.ServiceInfo type alias, just for compatible
with old generate pb.go file
@@ -62,32 +70,33 @@ type ServiceDefinition struct {
// Register assemble invoker chains like ProviderConfig.Load, init a service
per call
func (s *Server) Register(handler any, info *common.ServiceInfo, opts
...ServiceOption) error {
- baseOpts := []ServiceOption{WithIDLMode(constant.IDL)}
- baseOpts = append(baseOpts, opts...)
- newSvcOpts, err := s.genSvcOpts(handler, baseOpts...)
- if err != nil {
- return err
- }
- s.svcOptsMap.Store(newSvcOpts, info)
- return nil
+ return s.registerWithMode(handler, info, constant.IDL, opts...)
}
// RegisterService is for new Triple non-idl mode implement.
func (s *Server) RegisterService(handler any, opts ...ServiceOption) error {
+ return s.registerWithMode(handler, nil, constant.NONIDL, opts...)
+}
+
+// registerWithMode unified service registration logic
+func (s *Server) registerWithMode(handler any, info *common.ServiceInfo,
idlMode string, opts ...ServiceOption) error {
baseOpts := []ServiceOption{
- WithIDLMode(constant.NONIDL),
- WithInterface(common.GetReference(handler)),
+ WithIDLMode(idlMode),
+ }
+ // only need to explicitly set interface in NONIDL mode
+ if idlMode == constant.NONIDL {
+ baseOpts = append(baseOpts,
WithInterface(common.GetReference(handler)))
}
baseOpts = append(baseOpts, opts...)
- newSvcOpts, err := s.genSvcOpts(handler, baseOpts...)
+ newSvcOpts, err := s.genSvcOpts(handler, info, baseOpts...)
if err != nil {
return err
}
- s.svcOptsMap.Store(newSvcOpts, nil)
+ s.registerServiceOptions(newSvcOpts)
return nil
}
-func (s *Server) genSvcOpts(handler any, opts ...ServiceOption)
(*ServiceOptions, error) {
+func (s *Server) genSvcOpts(handler any, info *common.ServiceInfo, opts
...ServiceOption) (*ServiceOptions, error) {
if s.cfg == nil {
return nil, errors.New("Server has not been initialized, please
use NewServer() to create Server")
}
@@ -154,41 +163,55 @@ func (s *Server) genSvcOpts(handler any, opts
...ServiceOption) (*ServiceOptions
if err := newSvcOpts.init(s, svcOpts...); err != nil {
return nil, err
}
+ svcConf := newSvcOpts.Service
+ if info != nil {
+ if svcConf.Interface == "" {
+ svcConf.Interface = info.InterfaceName
+ }
+ newSvcOpts.info = info
+ }
+ newSvcOpts.Id = interfaceName
newSvcOpts.Implement(handler)
+ newSvcOpts.info = enhanceServiceInfo(info)
return newSvcOpts, nil
}
-func (s *Server) exportServices() (err error) {
- s.svcOptsMap.Range(func(svcOptsRaw, infoRaw any) bool {
- svcOpts := svcOptsRaw.(*ServiceOptions)
- if info, ok := infoRaw.(*common.ServiceInfo); !ok || info ==
nil {
- err = svcOpts.ExportWithoutInfo()
- } else {
- // Add a method with a name of a different first-letter
case
- // to achieve interoperability with java
- // TODO: The method name case sensitivity in Dubbo-java
should be addressed.
- // We ought to make changes to handle this issue.
- var additionalMethods []common.MethodInfo
- for _, method := range info.Methods {
- newMethod := method
- newMethod.Name =
dubboutil.SwapCaseFirstRune(method.Name)
- additionalMethods = append(additionalMethods,
newMethod)
- }
-
- info.Methods = append(info.Methods,
additionalMethods...)
+// Add a method with a name of a different first-letter case
+// to achieve interoperability with java
+// TODO: The method name case sensitivity in Dubbo-java should be addressed.
+// We ought to make changes to handle this issue.
+func enhanceServiceInfo(info *common.ServiceInfo) *common.ServiceInfo {
+ if info == nil {
+ return info
+ }
+ var additionalMethods []common.MethodInfo
+ for _, method := range info.Methods {
+ newMethod := method
+ newMethod.Name = dubboutil.SwapCaseFirstRune(method.Name)
+ additionalMethods = append(additionalMethods, newMethod)
+ }
+ info.Methods = append(info.Methods, additionalMethods...)
+ return info
+}
- err = svcOpts.ExportWithInfo(info)
- }
- if err != nil {
+func (s *Server) exportServices() error {
+ for _, svcOpts := range s.svcOptsMap {
+ if err := svcOpts.Export(); err != nil {
logger.Errorf("export %s service failed, err: %s",
svcOpts.Service.Interface, err)
- return false
+ return errors.Wrapf(err, "failed to export service %s",
svcOpts.Service.Interface)
}
- return true
- })
- return err
+ }
+ return nil
}
func (s *Server) Serve() error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.serve {
+ return errors.New("server has already been started")
+ }
+ // prevent multiple calls to Serve
+ s.serve = true
// the registryConfig in ServiceOptions and ServerOptions all need to
init a metadataReporter,
// when ServiceOptions.init() is called we don't know if a new registry
config is set in the future use serviceOption
if err := metadata.InitRegistryMetadataReport(s.cfg.Registries); err !=
nil {
@@ -224,11 +247,11 @@ func (s *Server) exportInternalServices() error {
cfg.Protocols = s.cfg.Protocols
cfg.Registries = s.cfg.Registries
- services := make([]*InternalService, 0, len(proServices))
+ services := make([]*InternalService, 0, len(internalProServices))
- proLock.Lock()
- defer proLock.Unlock()
- for _, service := range proServices {
+ internalProLock.Lock()
+ defer internalProLock.Unlock()
+ for _, service := range internalProServices {
if service.Init == nil {
return errors.New("[internal service]internal service
init func is empty, please set the init func correctly")
}
@@ -237,7 +260,7 @@ func (s *Server) exportInternalServices() error {
logger.Infof("[internal service]%s service will not
expose", service.Name)
continue
}
- newSvcOpts, err := s.genSvcOpts(sd.Handler, sd.Opts...)
+ newSvcOpts, err := s.genSvcOpts(sd.Handler, sd.Info, sd.Opts...)
if err != nil {
return err
}
@@ -254,7 +277,7 @@ func (s *Server) exportInternalServices() error {
if service.BeforeExport != nil {
service.BeforeExport(service.svcOpts)
}
- err := service.svcOpts.ExportWithInfo(service.info)
+ err := service.svcOpts.Export()
if service.AfterExport != nil {
service.AfterExport(service.svcOpts, err)
}
@@ -316,7 +339,9 @@ func NewServer(opts ...ServerOption) (*Server, error) {
}
srv := &Server{
- cfg: newSrvOpts,
+ cfg: newSrvOpts,
+ svcOptsMap: make(map[string]*ServiceOptions),
+ interfaceNameServices: make(map[string]*ServiceOptions),
}
return srv, nil
}
@@ -326,7 +351,54 @@ func SetProviderServices(sd *InternalService) {
logger.Warnf("[internal service]internal name is empty, please
set internal name")
return
}
- proLock.Lock()
- defer proLock.Unlock()
- proServices = append(proServices, sd)
+ internalProLock.Lock()
+ defer internalProLock.Unlock()
+ internalProServices = append(internalProServices, sd)
+}
+
+func (s *Server) registerServiceOptions(serviceOptions *ServiceOptions) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ logger.Infof("A provider service %s was registered successfully.",
serviceOptions.Id)
+ s.svcOptsMap[serviceOptions.Id] = serviceOptions
+ if serviceOptions.Service != nil && serviceOptions.Service.Interface !=
"" {
+ s.interfaceNameServices[serviceOptions.Service.Interface] =
serviceOptions
+ }
+}
+
+// GetServiceOptions retrieves the ServiceOptions for a service by its name/ID
+func (s *Server) GetServiceOptions(name string) *ServiceOptions {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ return s.svcOptsMap[name]
+}
+
+// GetServiceInfo retrieves the ServiceInfo for a service by its name/ID
+// Returns nil if the service is not found or has no ServiceInfo
+func (s *Server) GetServiceInfo(name string) *common.ServiceInfo {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ if svcOpts, ok := s.svcOptsMap[name]; ok {
+ return svcOpts.info
+ }
+ return nil
+}
+
+// GetRPCService retrieves the RPCService implementation for a service by its
name/ID
+// Returns nil if the service is not found or has no RPCService
+func (s *Server) GetRPCService(name string) common.RPCService {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ if svcOpts, ok := s.svcOptsMap[name]; ok {
+ return svcOpts.rpcService
+ }
+ return nil
+}
+
+// GetServiceOptionsByInterfaceName retrieves the ServiceOptions for a service
by its interface name
+// Returns nil if no service is found with the given interface name
+func (s *Server) GetServiceOptionsByInterfaceName(interfaceName string)
*ServiceOptions {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ return s.interfaceNameServices[interfaceName]
}
diff --git a/tools/protoc-gen-triple-openapi/constant/format.go
b/tools/protoc-gen-triple-openapi/constant/format.go
index 53ca92875..59b3b0d59 100644
--- a/tools/protoc-gen-triple-openapi/constant/format.go
+++ b/tools/protoc-gen-triple-openapi/constant/format.go
@@ -25,6 +25,6 @@ const (
// json format
const (
- JSONFormat = "json"
+ JSONFormat = "json"
JSONFormatSuffix = ".triple.openapi.json"
)