This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/main by this push:
new 2e54f26c9 feat:add internal service extension (#2577)
2e54f26c9 is described below
commit 2e54f26c98f6ddd1bac7026197b634cb3bafdf3c
Author: finalt <[email protected]>
AuthorDate: Fri Jan 26 16:05:39 2024 +0800
feat:add internal service extension (#2577)
fixes #2565, #2539
---
common/constant/key.go | 10 +++
protocol/triple/health/healthServer.go | 18 ++--
protocol/triple/reflection/serverreflection.go | 16 ++--
server/server.go | 117 +++++++++++++++++++++----
4 files changed, 132 insertions(+), 29 deletions(-)
diff --git a/common/constant/key.go b/common/constant/key.go
index e3dfa8760..5dd9dcb0c 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -17,6 +17,8 @@
package constant
+import "math"
+
type DubboCtxKey string
const (
@@ -439,3 +441,11 @@ const (
DefaultMetaFileName = "dubbo.metadata."
DefaultEntrySize = 100
)
+
+// priority
+const (
+ DefaultPriority = 0
+ HighestPriority = math.MinInt32
+ // LowestPriority for metadata service
+ LowestPriority = math.MaxInt32
+)
diff --git a/protocol/triple/health/healthServer.go
b/protocol/triple/health/healthServer.go
index aca420d27..5bd89063e 100644
--- a/protocol/triple/health/healthServer.go
+++ b/protocol/triple/health/healthServer.go
@@ -172,14 +172,18 @@ func (srv *HealthTripleServer) Resume() {
func init() {
healthServer = NewServer()
-
internal.HealthSetServingStatusServing = SetServingStatusServing
-
- server.SetProServices(&server.ServiceDefinition{
- Handler: healthServer,
- Info: &triple_health.Health_ServiceInfo,
- Opts: []server.ServiceOption{server.WithNotRegister(),
-
server.WithInterface(constant.HealthCheckServiceInterface)},
+ server.SetProServices(&server.InternalService{
+ Name: "healthCheck",
+ Init: func(options *server.ServiceOptions)
(*server.ServiceDefinition, bool) {
+ return &server.ServiceDefinition{
+ Handler: healthServer,
+ Info: &triple_health.Health_ServiceInfo,
+ Opts:
[]server.ServiceOption{server.WithNotRegister(),
+
server.WithInterface(constant.HealthCheckServiceInterface)},
+ }, true
+ },
+ Priority: constant.DefaultPriority,
})
// In order to adapt config.Load
diff --git a/protocol/triple/reflection/serverreflection.go
b/protocol/triple/reflection/serverreflection.go
index dff18213b..a3f6ac134 100644
--- a/protocol/triple/reflection/serverreflection.go
+++ b/protocol/triple/reflection/serverreflection.go
@@ -263,11 +263,17 @@ var reflectionServer *ReflectionServer
func init() {
reflectionServer = NewServer()
internal.ReflectionRegister = Register
- server.SetProServices(&server.ServiceDefinition{
- Handler: reflectionServer,
- Info: &rpb.ServerReflection_ServiceInfo,
- Opts: []server.ServiceOption{server.WithNotRegister(),
-
server.WithInterface(constant.ReflectionServiceInterface)},
+ server.SetProServices(&server.InternalService{
+ Name: "reflection",
+ Init: func(options *server.ServiceOptions)
(*server.ServiceDefinition, bool) {
+ return &server.ServiceDefinition{
+ Handler: reflectionServer,
+ Info: &rpb.ServerReflection_ServiceInfo,
+ Opts:
[]server.ServiceOption{server.WithNotRegister(),
+
server.WithInterface(constant.ReflectionServiceInterface)},
+ }, true
+ },
+ Priority: constant.DefaultPriority,
})
// In order to adapt config.Load
// Plans for future removal
diff --git a/server/server.go b/server/server.go
index a76d9cd59..0258a940d 100644
--- a/server/server.go
+++ b/server/server.go
@@ -21,6 +21,7 @@ package server
import (
"context"
"fmt"
+ "sort"
"sync"
)
@@ -38,7 +39,8 @@ import (
)
// proServices are for internal services
-var proServices = map[string]*ServiceDefinition{}
+var proServices = make([]*InternalService, 0, 16)
+var proLock sync.Mutex
type Server struct {
invoker protocol.Invoker
@@ -123,8 +125,17 @@ func newInfoInvoker(url *common.URL, info *ServiceInfo,
svc common.RPCService) p
// Register assemble invoker chains like ProviderConfig.Load, init a service
per call
func (s *Server) Register(handler interface{}, info *ServiceInfo, opts
...ServiceOption) error {
+ newSvcOpts, err := s.genSvcOpts(handler, opts...)
+ if err != nil {
+ return err
+ }
+ s.svcOptsMap.Store(newSvcOpts, info)
+ return nil
+}
+
+func (s *Server) genSvcOpts(handler interface{}, opts ...ServiceOption)
(*ServiceOptions, error) {
if s.cfg == nil {
- return errors.New("Server has not been initialized, please use
NewServer() to create Server")
+ return nil, errors.New("Server has not been initialized, please
use NewServer() to create Server")
}
var svcOpts []ServiceOption
appCfg := s.cfg.Application
@@ -153,16 +164,13 @@ func (s *Server) Register(handler interface{}, info
*ServiceInfo, opts ...Servic
SetRegistries(regsCfg),
)
}
-
// options passed by users have higher priority
svcOpts = append(svcOpts, opts...)
if err := newSvcOpts.init(s, svcOpts...); err != nil {
- return err
+ return nil, err
}
newSvcOpts.Implement(handler)
- s.svcOptsMap.Store(newSvcOpts, info)
-
- return nil
+ return newSvcOpts, nil
}
func (s *Server) exportServices() (err error) {
@@ -187,11 +195,88 @@ func (s *Server) Serve() error {
if err := s.exportServices(); err != nil {
return err
}
+ if err := s.exportInternalServices(); err != nil {
+ return err
+ }
metadata.ExportMetadataService()
registry_exposed.RegisterServiceInstance(s.cfg.Application.Name,
s.cfg.Application.Tag, s.cfg.Application.MetadataType)
select {}
}
+// In order to expose internal services
+func (s *Server) exportInternalServices() error {
+ cfg := &ServiceOptions{}
+ cfg.Application = s.cfg.Application
+ cfg.Provider = s.cfg.Provider
+ cfg.Protocols = s.cfg.Protocols
+ cfg.Registries = s.cfg.Registries
+
+ services := make([]*InternalService, 0, len(proServices))
+
+ proLock.Lock()
+ defer proLock.Unlock()
+ for _, service := range proServices {
+ if service.Init == nil {
+ return errors.New("[internal service]internal service
init func is empty, please set the init func correctly")
+ }
+ sd, ok := service.Init(cfg)
+ if !ok {
+ logger.Infof("[internal service]%s service will not
expose", service.Name)
+ continue
+ }
+ newSvcOpts, err := s.genSvcOpts(sd.Handler, sd.Opts...)
+ if err != nil {
+ return err
+ }
+ service.svcOpts = newSvcOpts
+ service.info = sd.Info
+ services = append(services, service)
+ }
+
+ sort.Slice(services, func(i, j int) bool {
+ return services[i].Priority < services[j].Priority
+ })
+
+ for _, service := range services {
+ if service.BeforeExport != nil {
+ service.BeforeExport(service.svcOpts)
+ }
+ err := service.svcOpts.ExportWithInfo(service.info)
+ if service.AfterExport != nil {
+ service.AfterExport(service.svcOpts, err)
+ }
+ if err != nil {
+ logger.Errorf("[internal service]export %s service
failed, err: %s", service.Name, err)
+ return err
+ }
+ }
+
+ return nil
+}
+
+// InternalService for dubbo internal services
+type InternalService struct {
+ // This is required
+ // internal service name
+ Name string
+ svcOpts *ServiceOptions
+ info *ServiceInfo
+ // This is required
+ // This options is service configuration
+ // Return serviceDefinition and bool, where bool indicates whether it
is exported
+ Init func(options *ServiceOptions) (*ServiceDefinition, bool)
+ // This options is InternalService.svcOpts itself
+ BeforeExport func(options *ServiceOptions)
+ // This options is InternalService.svcOpts itself
+ AfterExport func(options *ServiceOptions, err error)
+ // Priority of service exposure
+ // Lower numbers have the higher priority
+ // The default priority is 0
+ // The metadata service is exposed at the end
+ // If you have no requirements for the order of service exposure, you
can use the default priority or not set
+ Priority int
+}
+
type MethodInfo struct {
Name string
Type string
@@ -210,17 +295,15 @@ func NewServer(opts ...ServerOption) (*Server, error) {
srv := &Server{
cfg: newSrvOpts,
}
-
- for _, service := range proServices {
- err := srv.Register(service.Handler, service.Info,
service.Opts...)
- if err != nil {
- return nil, err
- }
- }
-
return srv, nil
}
-func SetProServices(sd *ServiceDefinition) {
- proServices[sd.Info.InterfaceName] = sd
+func SetProServices(sd *InternalService) {
+ if sd.Name == "" {
+ logger.Warnf("[internal service]internal name is empty, please
set internal name")
+ return
+ }
+ proLock.Lock()
+ defer proLock.Unlock()
+ proServices = append(proServices, sd)
}