This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 2e54f26c9 feat:add internal service extension (#2577)
2e54f26c9 is described below

commit 2e54f26c98f6ddd1bac7026197b634cb3bafdf3c
Author: finalt <[email protected]>
AuthorDate: Fri Jan 26 16:05:39 2024 +0800

    feat:add internal service extension (#2577)
    
    fixes #2565, #2539
---
 common/constant/key.go                         |  10 +++
 protocol/triple/health/healthServer.go         |  18 ++--
 protocol/triple/reflection/serverreflection.go |  16 ++--
 server/server.go                               | 117 +++++++++++++++++++++----
 4 files changed, 132 insertions(+), 29 deletions(-)

diff --git a/common/constant/key.go b/common/constant/key.go
index e3dfa8760..5dd9dcb0c 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -17,6 +17,8 @@
 
 package constant
 
+import "math"
+
 type DubboCtxKey string
 
 const (
@@ -439,3 +441,11 @@ const (
        DefaultMetaFileName  = "dubbo.metadata."
        DefaultEntrySize     = 100
 )
+
+// priority
+const (
+       DefaultPriority = 0
+       HighestPriority = math.MinInt32
+       // LowestPriority for metadata service
+       LowestPriority = math.MaxInt32
+)
diff --git a/protocol/triple/health/healthServer.go 
b/protocol/triple/health/healthServer.go
index aca420d27..5bd89063e 100644
--- a/protocol/triple/health/healthServer.go
+++ b/protocol/triple/health/healthServer.go
@@ -172,14 +172,18 @@ func (srv *HealthTripleServer) Resume() {
 
 func init() {
        healthServer = NewServer()
-
        internal.HealthSetServingStatusServing = SetServingStatusServing
-
-       server.SetProServices(&server.ServiceDefinition{
-               Handler: healthServer,
-               Info:    &triple_health.Health_ServiceInfo,
-               Opts: []server.ServiceOption{server.WithNotRegister(),
-                       
server.WithInterface(constant.HealthCheckServiceInterface)},
+       server.SetProServices(&server.InternalService{
+               Name: "healthCheck",
+               Init: func(options *server.ServiceOptions) 
(*server.ServiceDefinition, bool) {
+                       return &server.ServiceDefinition{
+                               Handler: healthServer,
+                               Info:    &triple_health.Health_ServiceInfo,
+                               Opts: 
[]server.ServiceOption{server.WithNotRegister(),
+                                       
server.WithInterface(constant.HealthCheckServiceInterface)},
+                       }, true
+               },
+               Priority: constant.DefaultPriority,
        })
 
        // In order to adapt config.Load
diff --git a/protocol/triple/reflection/serverreflection.go 
b/protocol/triple/reflection/serverreflection.go
index dff18213b..a3f6ac134 100644
--- a/protocol/triple/reflection/serverreflection.go
+++ b/protocol/triple/reflection/serverreflection.go
@@ -263,11 +263,17 @@ var reflectionServer *ReflectionServer
 func init() {
        reflectionServer = NewServer()
        internal.ReflectionRegister = Register
-       server.SetProServices(&server.ServiceDefinition{
-               Handler: reflectionServer,
-               Info:    &rpb.ServerReflection_ServiceInfo,
-               Opts: []server.ServiceOption{server.WithNotRegister(),
-                       
server.WithInterface(constant.ReflectionServiceInterface)},
+       server.SetProServices(&server.InternalService{
+               Name: "reflection",
+               Init: func(options *server.ServiceOptions) 
(*server.ServiceDefinition, bool) {
+                       return &server.ServiceDefinition{
+                               Handler: reflectionServer,
+                               Info:    &rpb.ServerReflection_ServiceInfo,
+                               Opts: 
[]server.ServiceOption{server.WithNotRegister(),
+                                       
server.WithInterface(constant.ReflectionServiceInterface)},
+                       }, true
+               },
+               Priority: constant.DefaultPriority,
        })
        // In order to adapt config.Load
        // Plans for future removal
diff --git a/server/server.go b/server/server.go
index a76d9cd59..0258a940d 100644
--- a/server/server.go
+++ b/server/server.go
@@ -21,6 +21,7 @@ package server
 import (
        "context"
        "fmt"
+       "sort"
        "sync"
 )
 
@@ -38,7 +39,8 @@ import (
 )
 
 // proServices are for internal services
-var proServices = map[string]*ServiceDefinition{}
+var proServices = make([]*InternalService, 0, 16)
+var proLock sync.Mutex
 
 type Server struct {
        invoker protocol.Invoker
@@ -123,8 +125,17 @@ func newInfoInvoker(url *common.URL, info *ServiceInfo, 
svc common.RPCService) p
 
 // Register assemble invoker chains like ProviderConfig.Load, init a service 
per call
 func (s *Server) Register(handler interface{}, info *ServiceInfo, opts 
...ServiceOption) error {
+       newSvcOpts, err := s.genSvcOpts(handler, opts...)
+       if err != nil {
+               return err
+       }
+       s.svcOptsMap.Store(newSvcOpts, info)
+       return nil
+}
+
+func (s *Server) genSvcOpts(handler interface{}, opts ...ServiceOption) 
(*ServiceOptions, error) {
        if s.cfg == nil {
-               return errors.New("Server has not been initialized, please use 
NewServer() to create Server")
+               return nil, errors.New("Server has not been initialized, please 
use NewServer() to create Server")
        }
        var svcOpts []ServiceOption
        appCfg := s.cfg.Application
@@ -153,16 +164,13 @@ func (s *Server) Register(handler interface{}, info 
*ServiceInfo, opts ...Servic
                        SetRegistries(regsCfg),
                )
        }
-
        // options passed by users have higher priority
        svcOpts = append(svcOpts, opts...)
        if err := newSvcOpts.init(s, svcOpts...); err != nil {
-               return err
+               return nil, err
        }
        newSvcOpts.Implement(handler)
-       s.svcOptsMap.Store(newSvcOpts, info)
-
-       return nil
+       return newSvcOpts, nil
 }
 
 func (s *Server) exportServices() (err error) {
@@ -187,11 +195,88 @@ func (s *Server) Serve() error {
        if err := s.exportServices(); err != nil {
                return err
        }
+       if err := s.exportInternalServices(); err != nil {
+               return err
+       }
        metadata.ExportMetadataService()
        registry_exposed.RegisterServiceInstance(s.cfg.Application.Name, 
s.cfg.Application.Tag, s.cfg.Application.MetadataType)
        select {}
 }
 
+// In order to expose internal services
+func (s *Server) exportInternalServices() error {
+       cfg := &ServiceOptions{}
+       cfg.Application = s.cfg.Application
+       cfg.Provider = s.cfg.Provider
+       cfg.Protocols = s.cfg.Protocols
+       cfg.Registries = s.cfg.Registries
+
+       services := make([]*InternalService, 0, len(proServices))
+
+       proLock.Lock()
+       defer proLock.Unlock()
+       for _, service := range proServices {
+               if service.Init == nil {
+                       return errors.New("[internal service]internal service 
init func is empty, please set the init func correctly")
+               }
+               sd, ok := service.Init(cfg)
+               if !ok {
+                       logger.Infof("[internal service]%s service will not 
expose", service.Name)
+                       continue
+               }
+               newSvcOpts, err := s.genSvcOpts(sd.Handler, sd.Opts...)
+               if err != nil {
+                       return err
+               }
+               service.svcOpts = newSvcOpts
+               service.info = sd.Info
+               services = append(services, service)
+       }
+
+       sort.Slice(services, func(i, j int) bool {
+               return services[i].Priority < services[j].Priority
+       })
+
+       for _, service := range services {
+               if service.BeforeExport != nil {
+                       service.BeforeExport(service.svcOpts)
+               }
+               err := service.svcOpts.ExportWithInfo(service.info)
+               if service.AfterExport != nil {
+                       service.AfterExport(service.svcOpts, err)
+               }
+               if err != nil {
+                       logger.Errorf("[internal service]export %s service 
failed, err: %s", service.Name, err)
+                       return err
+               }
+       }
+
+       return nil
+}
+
+// InternalService for dubbo internal services
+type InternalService struct {
+       // This is required
+       // internal service name
+       Name    string
+       svcOpts *ServiceOptions
+       info    *ServiceInfo
+       // This is required
+       // This options is service configuration
+       // Return serviceDefinition and bool, where bool indicates whether it 
is exported
+       Init func(options *ServiceOptions) (*ServiceDefinition, bool)
+       // This options is InternalService.svcOpts itself
+       BeforeExport func(options *ServiceOptions)
+       // This options is InternalService.svcOpts itself
+       AfterExport func(options *ServiceOptions, err error)
+       // Priority of service exposure
+       // Lower numbers have the higher priority
+       // The default priority is 0
+       // The metadata service is exposed at the end
+       // If you have no requirements for the order of service exposure, you 
can use the default priority or not set
+       Priority int
+}
+
 type MethodInfo struct {
        Name           string
        Type           string
@@ -210,17 +295,15 @@ func NewServer(opts ...ServerOption) (*Server, error) {
        srv := &Server{
                cfg: newSrvOpts,
        }
-
-       for _, service := range proServices {
-               err := srv.Register(service.Handler, service.Info, 
service.Opts...)
-               if err != nil {
-                       return nil, err
-               }
-       }
-
        return srv, nil
 }
 
-func SetProServices(sd *ServiceDefinition) {
-       proServices[sd.Info.InterfaceName] = sd
+func SetProServices(sd *InternalService) {
+       if sd.Name == "" {
+               logger.Warnf("[internal service]internal name is empty, please 
set internal name")
+               return
+       }
+       proLock.Lock()
+       defer proLock.Unlock()
+       proServices = append(proServices, sd)
 }

Reply via email to