This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 1.5
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/1.5 by this push:
     new 13e7a6d  Ftr: Added more event distribution types and improved event 
distribution mechanism for 1.5 (#1405)
13e7a6d is described below

commit 13e7a6dcce74e0874951a726d00490136e0fea5c
Author: ChangedenChan <[email protected]>
AuthorDate: Fri Aug 27 22:03:52 2021 +0800

    Ftr: Added more event distribution types and improved event distribution 
mechanism for 1.5 (#1405)
    
    * 改进旧版的事件分发机制,引入更多事件
    
    * 新增事件删除功能
    
    * 优化命名方式
    
    * 优化代码逻辑;合并ConfigLoaderHook相关Func至ConfigPostProcessor中;
    
    Co-authored-by: Changeden <[email protected]>
---
 common/constant/key.go                     | 11 ++++
 common/extension/config_post_processor.go  |  5 ++
 config/config_loader.go                    | 15 +++++-
 config/config_loader_test.go               | 83 ++++++++++++++++++++++++++++++
 config/graceful_shutdown.go                |  8 +++
 config/interfaces/config_post_processor.go | 15 +++++-
 config/reference_config.go                 | 24 +++++++--
 config/service_config.go                   | 24 +++++++--
 8 files changed, 175 insertions(+), 10 deletions(-)

diff --git a/common/constant/key.go b/common/constant/key.go
index 7240d10..b062b07 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -350,3 +350,14 @@ const (
        // SERVICE_DISCOVERY_KEY indicate which service discovery instance will 
be used
        SERVICE_DISCOVERY_KEY = "service_discovery"
 )
+
+// Loader Hook
+const (
+       HookEventBeforeReferenceConnect  = "before-reference-connect"
+       HookEventReferenceConnectSuccess = "reference-connect-success"
+       HookEventReferenceConnectFail    = "reference-connect-fail"
+
+       HookEventBeforeProviderConnect  = "before-service-listen"
+       HookEventProviderConnectSuccess = "service-listen-success"
+       HookEventProviderConnectFail    = "service-listen-fail"
+)
diff --git a/common/extension/config_post_processor.go 
b/common/extension/config_post_processor.go
index db126b7..e65bb06 100644
--- a/common/extension/config_post_processor.go
+++ b/common/extension/config_post_processor.go
@@ -35,6 +35,11 @@ func GetConfigPostProcessor(name string) 
interfaces.ConfigPostProcessor {
        return processors[name]
 }
 
+// RemoveConfigPostProcessor remove process from processors.
+func RemoveConfigPostProcessor(name string) {
+       delete(processors, name)
+}
+
 // GetConfigPostProcessors returns all registered instances of 
ConfigPostProcessor.
 func GetConfigPostProcessors() []interfaces.ConfigPostProcessor {
        ret := make([]interfaces.ConfigPostProcessor, 0, len(processors))
diff --git a/config/config_loader.go b/config/config_loader.go
index 14caf2c..7ec64aa 100644
--- a/config/config_loader.go
+++ b/config/config_loader.go
@@ -169,6 +169,7 @@ func loadConsumerConfig() {
        for {
                checkok := true
                for _, refconfig := range consumerConfig.References {
+                       referenceURL := refconfig.getValidURL()
                        if (refconfig.Check != nil && *refconfig.Check) ||
                                (refconfig.Check == nil && consumerConfig.Check 
!= nil && *consumerConfig.Check) ||
                                (refconfig.Check == nil && consumerConfig.Check 
== nil) { // default to true
@@ -179,6 +180,7 @@ func loadConsumerConfig() {
                                        if count > maxWait {
                                                errMsg := fmt.Sprintf("Failed 
to check the status of the service %v. No provider available for the service to 
the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
                                                logger.Error(errMsg)
+                                               
refconfig.postProcessConfig(referenceURL, 
constant.HookEventReferenceConnectFail, &errMsg)
                                                panic(errMsg)
                                        }
                                        time.Sleep(time.Second * 1)
@@ -186,13 +188,16 @@ func loadConsumerConfig() {
                                }
                                if refconfig.invoker == nil {
                                        logger.Warnf("The interface %s invoker 
not exist, may you should check your interface config.", 
refconfig.InterfaceName)
+                                       continue
                                }
                        }
+                       refconfig.postProcessConfig(referenceURL, 
constant.HookEventReferenceConnectSuccess, nil)
                }
                if checkok {
                        break
                }
        }
+       postAllConsumersConnectComplete()
 }
 
 func loadProviderConfig() {
@@ -247,11 +252,17 @@ func loadProviderConfig() {
                svs.id = key
                svs.Implement(rpcService)
                svs.Protocols = providerConfig.Protocols
-               if err := svs.Export(); err != nil {
-                       panic(fmt.Sprintf("service %s export failed! err: %#v", 
key, err))
+               err := svs.Export()
+               serviceURL := svs.getValidURL()
+               if err != nil {
+                       errMsg := fmt.Sprintf("service %s export failed! err: 
%#v", key, err)
+                       svs.postProcessConfig(serviceURL, 
constant.HookEventProviderConnectFail, &errMsg)
+                       panic(errMsg)
                }
+               svs.postProcessConfig(serviceURL, 
constant.HookEventProviderConnectSuccess, nil)
        }
        registerServiceInstance()
+       postAllProvidersConnectComplete()
 }
 
 // registerServiceInstance register service instance
diff --git a/config/config_loader_test.go b/config/config_loader_test.go
index 9c99efa..be51665 100644
--- a/config/config_loader_test.go
+++ b/config/config_loader_test.go
@@ -111,6 +111,89 @@ func TestLoad(t *testing.T) {
        providerConfig = nil
 }
 
+type CustomEvent struct {
+       t *testing.T
+}
+
+// implements interfaces.ConfigPostProcessor's functions
+func (c CustomEvent) PostProcessReferenceConfig(u *common.URL, event string, 
errMsg *string) {
+       logger.Debug("PostProcessReferenceConfig Start")
+       logger.Debug("Event: ", event)
+       logger.Debug("Url: ", u)
+       if errMsg != nil {
+               logger.Debug("Error Message: ", *errMsg)
+       }
+       logger.Debug("PostProcessReferenceConfig End")
+       assert.Equal(c.t, u.GetParam(constant.SIDE_KEY, ""), "consumer")
+}
+func (c CustomEvent) PostProcessServiceConfig(u *common.URL, event string, 
errMsg *string) {
+       logger.Debug("PostProcessServiceConfig Start")
+       logger.Debug("Event: ", event)
+       logger.Debug("Url: ", u)
+       if errMsg != nil {
+               logger.Debug("Error Message: ", *errMsg)
+       }
+       logger.Debug("PostProcessServiceConfig End")
+       assert.Equal(c.t, u.GetParam(constant.SIDE_KEY, ""), "provider")
+}
+func (c CustomEvent) AllReferencesConnectComplete() {
+       logger.Debug("AllConsumersConnectComplete")
+}
+func (c CustomEvent) AllServicesListenComplete() {
+       logger.Debug("AllServicesListenComplete")
+}
+func (c CustomEvent) BeforeShutdown() {
+       logger.Debug("BeforeShutdown")
+}
+
+func TestLoadWithEventDispatch(t *testing.T) {
+       doInitConsumer()
+       doInitProvider()
+       for _, v := range providerConfig.Services {
+               v.export = true
+       }
+
+       ms := &MockService{}
+       SetConsumerService(ms)
+       SetProviderService(ms)
+
+       extension.SetProtocol("registry", GetProtocol)
+       extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, 
cluster_impl.NewZoneAwareCluster)
+       extension.SetProxyFactory("default", 
proxy_factory.NewDefaultProxyFactory)
+       GetApplicationConfig().MetadataType = "mock"
+       var mm *mockMetadataService
+       extension.SetMetadataService("mock", func() (metadataService 
service.MetadataService, err error) {
+               if mm == nil {
+                       mm = &mockMetadataService{
+                               exportedServiceURLs: new(sync.Map),
+                               lock:                new(sync.RWMutex),
+                       }
+               }
+               return mm, nil
+       })
+
+       configPostProcessorName := "TestLoadWithEventDispatch"
+       extension.SetConfigPostProcessor(configPostProcessorName, 
CustomEvent{t})
+
+       Load()
+
+       assert.Equal(t, ms, GetRPCService(ms.Reference()))
+       ms2 := &struct {
+               MockService
+       }{}
+       RPCService(ms2)
+       assert.NotEqual(t, ms2, GetRPCService(ms2.Reference()))
+
+       conServices = map[string]common.RPCService{}
+       proServices = map[string]common.RPCService{}
+       err := common.ServiceMap.UnRegister("com.MockService", "mock",
+               common.ServiceKey("com.MockService", "huadong_idc", "1.0.0"))
+       assert.Nil(t, err)
+       extension.RemoveConfigPostProcessor(configPostProcessorName)
+       consumerConfig = nil
+       providerConfig = nil
+}
+
 func TestLoadWithSingleReg(t *testing.T) {
        doInitConsumerWithSingleRegistry()
        mockInitProviderWithSingleRegistry()
diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go
index 89ac2e3..f8f29c2 100644
--- a/config/graceful_shutdown.go
+++ b/config/graceful_shutdown.go
@@ -67,6 +67,7 @@ func GracefulShutdownInit() {
                        // gracefulShutdownOnce.Do(func() {
                        time.AfterFunc(totalTimeout(), func() {
                                logger.Warn("Shutdown gracefully timeout, 
application will shutdown immediately. ")
+                               postBeforeShutdown()
                                os.Exit(0)
                        })
                        BeforeShutdown()
@@ -76,6 +77,7 @@ func GracefulShutdownInit() {
                                        debug.WriteHeapDump(os.Stdout.Fd())
                                }
                        }
+                       postBeforeShutdown()
                        os.Exit(0)
                }
        }()
@@ -223,3 +225,9 @@ func getConsumerProtocols() *gxset.HashSet {
        }
        return result
 }
+
+func postBeforeShutdown() {
+       for _, p := range extension.GetConfigPostProcessors() {
+               p.BeforeShutdown()
+       }
+}
diff --git a/config/interfaces/config_post_processor.go 
b/config/interfaces/config_post_processor.go
index 53dd717..4578c59 100644
--- a/config/interfaces/config_post_processor.go
+++ b/config/interfaces/config_post_processor.go
@@ -25,8 +25,19 @@ import (
 // ServiceConfig during deployment time.
 type ConfigPostProcessor interface {
        // PostProcessReferenceConfig customizes ReferenceConfig's params.
-       PostProcessReferenceConfig(*common.URL)
+       // PostProcessReferenceConfig emit on refer reference (event: 
before-reference-connect, reference-connect-success, reference-connect-fail)
+       PostProcessReferenceConfig(url *common.URL, event string, errMsg 
*string)
 
        // PostProcessServiceConfig customizes ServiceConfig's params.
-       PostProcessServiceConfig(*common.URL)
+       // PostProcessServiceConfig emit on export service (event: 
before-service-listen, service-listen-success, service-listen-fail)
+       PostProcessServiceConfig(url *common.URL, event string, errMsg *string)
+
+       // AllReferencesConnectComplete emit on all references export complete
+       AllReferencesConnectComplete()
+
+       // AllServicesListenComplete emit on all services export complete
+       AllServicesListenComplete()
+
+       // BeforeShutdown emit on before shutdown
+       BeforeShutdown()
 }
diff --git a/config/reference_config.go b/config/reference_config.go
index d3730f0..eb35ee3 100644
--- a/config/reference_config.go
+++ b/config/reference_config.go
@@ -101,7 +101,7 @@ func (c *ReferenceConfig) Refer(_ interface{}) {
        if c.ForceTag {
                cfgURL.AddParam(constant.ForceUseTag, "true")
        }
-       c.postProcessConfig(cfgURL)
+       c.postProcessConfig(cfgURL, constant.HookEventBeforeReferenceConnect, 
nil)
        if c.URL != "" {
                // 1. user specified URL, could be peer-to-peer address, or 
register center's address.
                urlStrings := gxstrings.RegSplit(c.URL, "\\s*[;]+\\s*")
@@ -302,8 +302,26 @@ func publishConsumerDefinition(url *common.URL) {
 }
 
 // postProcessConfig asks registered ConfigPostProcessor to post-process the 
current ReferenceConfig.
-func (c *ReferenceConfig) postProcessConfig(url *common.URL) {
+func (c *ReferenceConfig) postProcessConfig(url *common.URL, event string, 
errMsg *string) {
        for _, p := range extension.GetConfigPostProcessors() {
-               p.PostProcessReferenceConfig(url)
+               p.PostProcessReferenceConfig(url, event, errMsg)
+       }
+}
+
+func (c *ReferenceConfig) getValidURL() *common.URL {
+       urls := c.urls
+       var u *common.URL
+       if urls != nil && len(urls) > 0 {
+               u = urls[0]
+       }
+       if u != nil && u.SubURL != nil {
+               return u.SubURL
+       }
+       return u
+}
+
+func postAllConsumersConnectComplete() {
+       for _, p := range extension.GetConfigPostProcessors() {
+               p.AllReferencesConnectComplete()
        }
 }
diff --git a/config/service_config.go b/config/service_config.go
index d73190c..dc4902e 100644
--- a/config/service_config.go
+++ b/config/service_config.go
@@ -203,7 +203,7 @@ func (c *ServiceConfig) Export() error {
                }
 
                // post process the URL to be exported
-               c.postProcessConfig(ivkURL)
+               c.postProcessConfig(ivkURL, 
constant.HookEventBeforeProviderConnect, nil)
                // config post processor may set "export" to false
                if !ivkURL.GetParamBool(constant.EXPORT_KEY, true) {
                        return nil
@@ -355,8 +355,26 @@ func publishServiceDefinition(url *common.URL) {
 }
 
 // postProcessConfig asks registered ConfigPostProcessor to post-process the 
current ServiceConfig.
-func (c *ServiceConfig) postProcessConfig(url *common.URL) {
+func (c *ServiceConfig) postProcessConfig(url *common.URL, event string, 
errMsg *string) {
        for _, p := range extension.GetConfigPostProcessors() {
-               p.PostProcessServiceConfig(url)
+               p.PostProcessServiceConfig(url, event, errMsg)
+       }
+}
+
+func (c *ServiceConfig) getValidURL() *common.URL {
+       urls := c.GetExportedUrls()
+       var u *common.URL
+       if urls != nil && len(urls) > 0 {
+               u = urls[0]
+       }
+       if u != nil && u.SubURL != nil {
+               return u.SubURL
+       }
+       return u
+}
+
+func postAllProvidersConnectComplete() {
+       for _, p := range extension.GetConfigPostProcessors() {
+               p.AllServicesListenComplete()
        }
 }

Reply via email to