This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 7ed2c82  Refactor to remove event dispatcher completely (#1368)
7ed2c82 is described below

commit 7ed2c82519530b5acd042e4428a521fe4bd771ad
Author: ken.lj <[email protected]>
AuthorDate: Fri Sep 24 13:25:08 2021 +0800

    Refactor to remove event dispatcher completely (#1368)
    
    * delete event dispatcher
    
    * delete event dispatcher
    
    * delete todo tag
    
    * polish code
    
    * lock listener operation
---
 common/extension/event_dispatcher.go               |  75 ---------
 common/extension/event_dispatcher_test.go          | 110 -------------
 .../observer/dispatcher/direct_event_dispatcher.go |  69 --------
 .../dispatcher/direct_event_dispatcher_test.go     |  77 ---------
 .../observer/dispatcher/mock_event_dispatcher.go   |  66 --------
 common/observer/event_dispatcher.go                |  27 ----
 common/observer/listenable.go                      | 142 -----------------
 common/observer/listenable_test.go                 |  64 --------
 config/config_loader.go                            |  13 +-
 config/config_loader_options.go                    |  23 ---
 registry/etcdv3/service_discovery.go               |  63 +++++---
 .../customizable_service_instance_listener.go      |  72 ---------
 .../customizable_service_instance_listener_test.go |  72 ---------
 .../event_publishing_service_deiscovery_test.go    | 176 ---------------------
 .../event/event_publishing_service_discovery.go    | 156 ------------------
 registry/event/log_event_listener.go               |  61 -------
 registry/event/log_event_listener_test.go          |  32 ----
 .../metadata_service_url_params_customizer.go      |   3 +-
 .../event/protocol_ports_metadata_customizer.go    |   3 +-
 registry/event/service_config_exported_event.go    |  44 ------
 registry/event/service_discovery_event.go          | 103 ------------
 registry/event/service_instance_event.go           |  87 ----------
 registry/event/service_name_mapping_listener.go    |  89 -----------
 registry/event/service_revision_customizer.go      |   5 +-
 registry/file/service_discovery.go                 |  17 --
 registry/nacos/service_discovery.go                |  58 ++++---
 registry/nacos/service_discovery_test.go           |   8 -
 registry/service_discovery.go                      |   9 --
 .../servicediscovery/service_discovery_registry.go |  41 +++--
 .../service_discovery_registry_test.go             |   7 -
 registry/zookeeper/service_discovery.go            |  62 +++++---
 31 files changed, 158 insertions(+), 1676 deletions(-)

diff --git a/common/extension/event_dispatcher.go 
b/common/extension/event_dispatcher.go
deleted file mode 100644
index f2f6a6d..0000000
--- a/common/extension/event_dispatcher.go
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package extension
-
-import (
-       "sync"
-)
-
-import (
-       "dubbo.apache.org/dubbo-go/v3/common/logger"
-       "dubbo.apache.org/dubbo-go/v3/common/observer"
-)
-
-var (
-       globalEventDispatcher observer.EventDispatcher
-       initEventListeners    []func() observer.EventListener
-       initEventOnce         sync.Once
-)
-
-var dispatchers = make(map[string]func() observer.EventDispatcher, 8)
-
-// SetEventDispatcher, actually, it doesn't really init the global dispatcher
-func SetEventDispatcher(name string, v func() observer.EventDispatcher) {
-       dispatchers[name] = v
-}
-
-// SetAndInitGlobalDispatcher will actually init the global dispatcher
-// if there is already a global dispatcher,
-// it will be override
-// if the dispatcher with the name not found, it will panic
-func SetAndInitGlobalDispatcher(name string) {
-       if len(name) == 0 {
-               name = "direct"
-       }
-       if globalEventDispatcher != nil {
-               logger.Warnf("EventDispatcher has been initialized. It will be 
replaced")
-       }
-
-       if dp, ok := dispatchers[name]; !ok || dp == nil {
-               panic("EventDispatcher for " + name + " is not found, make sure 
you have import the package, " +
-                       "like import _ 
dubbo.apache.org/dubbo-go/v3/common/observer/dispatcher.")
-       }
-       globalEventDispatcher = dispatchers[name]()
-}
-
-// GetGlobalDispatcher will init all listener and then return dispatcher
-func GetGlobalDispatcher() observer.EventDispatcher {
-       initEventOnce.Do(func() {
-               // we should delay to add the listeners to avoid some listeners 
left
-               for _, l := range initEventListeners {
-                       globalEventDispatcher.AddEventListener(l())
-               }
-       })
-       return globalEventDispatcher
-}
-
-// AddEventListener it will be added in global event dispatcher
-func AddEventListener(creator func() observer.EventListener) {
-       initEventListeners = append(initEventListeners, creator)
-}
diff --git a/common/extension/event_dispatcher_test.go 
b/common/extension/event_dispatcher_test.go
deleted file mode 100644
index a6baa86..0000000
--- a/common/extension/event_dispatcher_test.go
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package extension
-
-import (
-       "reflect"
-       "testing"
-)
-
-import (
-       "github.com/stretchr/testify/assert"
-)
-
-import (
-       "dubbo.apache.org/dubbo-go/v3/common/observer"
-)
-
-func TestSetAndInitGlobalDispatcher(t *testing.T) {
-       mock := &mockEventDispatcher{}
-       SetEventDispatcher("mock", func() observer.EventDispatcher {
-               return mock
-       })
-
-       SetAndInitGlobalDispatcher("mock")
-       dispatcher := GetGlobalDispatcher()
-       assert.NotNil(t, dispatcher)
-       assert.Equal(t, mock, dispatcher)
-
-       mock1 := &mockEventDispatcher{}
-
-       SetEventDispatcher("mock1", func() observer.EventDispatcher {
-               return mock1
-       })
-
-       SetAndInitGlobalDispatcher("mock1")
-       dispatcher = GetGlobalDispatcher()
-       assert.NotNil(t, dispatcher)
-       assert.Equal(t, mock1, dispatcher)
-}
-
-func TestAddEventListener(t *testing.T) {
-       AddEventListener(func() observer.EventListener {
-               return &mockEventListener{}
-       })
-
-       AddEventListener(func() observer.EventListener {
-               return &mockEventListener{}
-       })
-
-       assert.Equal(t, 2, len(initEventListeners))
-}
-
-type mockEventListener struct{}
-
-func (m mockEventListener) GetPriority() int {
-       panic("implement me")
-}
-
-func (m mockEventListener) OnEvent(e observer.Event) error {
-       panic("implement me")
-}
-
-func (m mockEventListener) GetEventType() reflect.Type {
-       panic("implement me")
-}
-
-type mockEventDispatcher struct{}
-
-func (m mockEventDispatcher) AddEventListener(listener observer.EventListener) 
{
-       panic("implement me")
-}
-
-func (m mockEventDispatcher) AddEventListeners(listenersSlice 
[]observer.EventListener) {
-       panic("implement me")
-}
-
-func (m mockEventDispatcher) RemoveEventListener(listener 
observer.EventListener) {
-       panic("implement me")
-}
-
-func (m mockEventDispatcher) RemoveEventListeners(listenersSlice 
[]observer.EventListener) {
-       panic("implement me")
-}
-
-func (m mockEventDispatcher) GetAllEventListeners() []observer.EventListener {
-       panic("implement me")
-}
-
-func (m mockEventDispatcher) RemoveAllEventListeners() {
-       panic("implement me")
-}
-
-func (m mockEventDispatcher) Dispatch(event observer.Event) {
-       panic("implement me")
-}
diff --git a/common/observer/dispatcher/direct_event_dispatcher.go 
b/common/observer/dispatcher/direct_event_dispatcher.go
deleted file mode 100644
index 4fc0a10..0000000
--- a/common/observer/dispatcher/direct_event_dispatcher.go
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package dispatcher
-
-import (
-       "reflect"
-)
-
-import (
-       "dubbo.apache.org/dubbo-go/v3/common/extension"
-       "dubbo.apache.org/dubbo-go/v3/common/logger"
-       "dubbo.apache.org/dubbo-go/v3/common/observer"
-)
-
-func init() {
-       extension.SetEventDispatcher("direct", NewDirectEventDispatcher)
-}
-
-// DirectEventDispatcher is align with DirectEventDispatcher interface in Java.
-// it's the top abstraction
-// Align with 2.7.5
-// Dispatcher event to listener direct
-type DirectEventDispatcher struct {
-       observer.BaseListener
-}
-
-// NewDirectEventDispatcher ac constructor of DirectEventDispatcher
-func NewDirectEventDispatcher() observer.EventDispatcher {
-       return &DirectEventDispatcher{
-               BaseListener: observer.NewBaseListener(),
-       }
-}
-
-// Dispatch event directly
-// it lookup the listener by event's type.
-// if listener not found, it just return and do nothing
-func (ded *DirectEventDispatcher) Dispatch(event observer.Event) {
-       if event == nil {
-               logger.Warnf("[DirectEventDispatcher] dispatch event nil")
-               return
-       }
-       eventType := reflect.TypeOf(event).Elem()
-       ded.Mutex.RLock()
-       defer ded.Mutex.RUnlock()
-       listenersSlice, loaded := ded.ListenersCache[eventType]
-       if !loaded {
-               return
-       }
-       for _, listener := range listenersSlice {
-               if err := listener.OnEvent(event); err != nil {
-                       logger.Warnf("[DirectEventDispatcher] dispatch event 
error:%v", err)
-               }
-       }
-}
diff --git a/common/observer/dispatcher/direct_event_dispatcher_test.go 
b/common/observer/dispatcher/direct_event_dispatcher_test.go
deleted file mode 100644
index ba183a6..0000000
--- a/common/observer/dispatcher/direct_event_dispatcher_test.go
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package dispatcher
-
-import (
-       "fmt"
-       "reflect"
-       "testing"
-)
-
-import (
-       "dubbo.apache.org/dubbo-go/v3/common/observer"
-)
-
-func TestDirectEventDispatcher_Dispatch(t *testing.T) {
-       ded := NewDirectEventDispatcher()
-       ded.AddEventListener(&TestEventListener{
-               BaseListener: observer.NewBaseListener(),
-       })
-       ded.AddEventListener(&TestEventListener1{})
-       ded.Dispatch(&TestEvent{})
-       ded.Dispatch(nil)
-}
-
-type TestEvent struct {
-       observer.BaseEvent
-}
-
-type TestEventListener struct {
-       observer.BaseListener
-       observer.EventListener
-}
-
-func (tel *TestEventListener) OnEvent(e observer.Event) error {
-       fmt.Println("TestEventListener")
-       return nil
-}
-
-func (tel *TestEventListener) GetPriority() int {
-       return -1
-}
-
-func (tel *TestEventListener) GetEventType() reflect.Type {
-       return reflect.TypeOf(&TestEvent{})
-}
-
-type TestEventListener1 struct {
-       observer.EventListener
-}
-
-func (tel *TestEventListener1) OnEvent(e observer.Event) error {
-       fmt.Println("TestEventListener1")
-       return nil
-}
-
-func (tel *TestEventListener1) GetPriority() int {
-       return 1
-}
-
-func (tel *TestEventListener1) GetEventType() reflect.Type {
-       return reflect.TypeOf(TestEvent{})
-}
diff --git a/common/observer/dispatcher/mock_event_dispatcher.go 
b/common/observer/dispatcher/mock_event_dispatcher.go
deleted file mode 100644
index 1835448..0000000
--- a/common/observer/dispatcher/mock_event_dispatcher.go
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package dispatcher
-
-import (
-       "dubbo.apache.org/dubbo-go/v3/common/observer"
-)
-
-// MockEventDispatcher will do nothing.
-// It is only used by tests
-// Now the implementation doing nothing,
-// But you can modify this if needed
-type MockEventDispatcher struct {
-       Notify chan struct{}
-       Event  observer.Event
-}
-
-func NewMockEventDispatcher() *MockEventDispatcher {
-       return &MockEventDispatcher{Notify: make(chan struct{}, 1)}
-}
-
-// AddEventListener do nothing
-func (m *MockEventDispatcher) AddEventListener(listener 
observer.EventListener) {
-}
-
-// AddEventListeners do nothing
-func (m *MockEventDispatcher) AddEventListeners(listenersSlice 
[]observer.EventListener) {
-}
-
-// RemoveEventListener do nothing
-func (m *MockEventDispatcher) RemoveEventListener(listener 
observer.EventListener) {
-}
-
-// RemoveEventListeners do nothing
-func (m *MockEventDispatcher) RemoveEventListeners(listenersSlice 
[]observer.EventListener) {
-}
-
-// GetAllEventListeners return empty list
-func (m *MockEventDispatcher) GetAllEventListeners() []observer.EventListener {
-       return make([]observer.EventListener, 0)
-}
-
-// RemoveAllEventListeners do nothing
-func (m *MockEventDispatcher) RemoveAllEventListeners() {
-}
-
-// Dispatch do nothing
-func (m *MockEventDispatcher) Dispatch(event observer.Event) {
-       m.Event = event
-       m.Notify <- struct{}{}
-}
diff --git a/common/observer/event_dispatcher.go 
b/common/observer/event_dispatcher.go
deleted file mode 100644
index 17745e6..0000000
--- a/common/observer/event_dispatcher.go
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package observer
-
-// EventDispatcher is align with EventDispatcher interface in Java.
-// it's the top abstraction
-// Align with 2.7.5
-type EventDispatcher interface {
-       Listenable
-       // Dispatch event
-       Dispatch(event Event)
-}
diff --git a/common/observer/listenable.go b/common/observer/listenable.go
deleted file mode 100644
index bf6ae72..0000000
--- a/common/observer/listenable.go
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package observer
-
-import (
-       "reflect"
-       "sort"
-       "sync"
-)
-
-// Listenable could add and remove the event listener
-type Listenable interface {
-       AddEventListener(listener EventListener)
-       AddEventListeners(listenersSlice []EventListener)
-       RemoveEventListener(listener EventListener)
-       RemoveEventListeners(listenersSlice []EventListener)
-       GetAllEventListeners() []EventListener
-       RemoveAllEventListeners()
-}
-
-// BaseListener base listenable
-type BaseListener struct {
-       Listenable
-       ListenersCache map[reflect.Type][]EventListener
-       Mutex          sync.RWMutex
-}
-
-// NewBaseListener a constructor of base listenable
-func NewBaseListener() BaseListener {
-       return BaseListener{
-               ListenersCache: make(map[reflect.Type][]EventListener, 8),
-       }
-}
-
-// AddEventListener add event listener
-func (bl *BaseListener) AddEventListener(listener EventListener) {
-       eventType := listener.GetEventType()
-       if eventType.Kind() == reflect.Ptr {
-               eventType = eventType.Elem()
-       }
-       bl.Mutex.Lock()
-       defer bl.Mutex.Unlock()
-       listenersSlice, loaded := bl.ListenersCache[eventType]
-       if !loaded {
-               listenersSlice = make([]EventListener, 0, 8)
-       }
-       // return if listenersSlice already has this listener
-       if loaded && containListener(listenersSlice, listener) {
-               return
-       }
-       listenersSlice = append(listenersSlice, listener)
-       sort.Slice(listenersSlice, func(i, j int) bool {
-               return listenersSlice[i].GetPriority() < 
listenersSlice[j].GetPriority()
-       })
-       bl.ListenersCache[eventType] = listenersSlice
-}
-
-// AddEventListeners add the slice of event listener
-func (bl *BaseListener) AddEventListeners(listenersSlice []EventListener) {
-       for _, listener := range listenersSlice {
-               bl.AddEventListener(listener)
-       }
-}
-
-// RemoveEventListener remove the event listener
-func (bl *BaseListener) RemoveEventListener(listener EventListener) {
-       eventType := listener.GetEventType()
-       if eventType.Kind() == reflect.Ptr {
-               eventType = eventType.Elem()
-       }
-       bl.Mutex.Lock()
-       defer bl.Mutex.Unlock()
-       listenersSlice, loaded := bl.ListenersCache[eventType]
-       if !loaded {
-               return
-       }
-       for i, l := range listenersSlice {
-               if l == listener {
-                       listenersSlice = append(listenersSlice[:i], 
listenersSlice[i+1:]...)
-               }
-       }
-       bl.ListenersCache[eventType] = listenersSlice
-}
-
-// RemoveEventListeners remove the slice of event listener
-// it will iterate all listener and remove it one by one
-func (bl *BaseListener) RemoveEventListeners(listenersSlice []EventListener) {
-       for _, listener := range listenersSlice {
-               bl.RemoveEventListener(listener)
-       }
-}
-
-// RemoveAllEventListeners remove all
-// using Lock
-func (bl *BaseListener) RemoveAllEventListeners() {
-       bl.Mutex.Lock()
-       defer bl.Mutex.Unlock()
-       bl.ListenersCache = make(map[reflect.Type][]EventListener, 8)
-}
-
-// GetAllEventListeners get all listener
-// using RLock
-func (bl *BaseListener) GetAllEventListeners() []EventListener {
-       allListenersSlice := make([]EventListener, 0, 16)
-
-       bl.Mutex.RLock()
-       defer bl.Mutex.RUnlock()
-       for _, listenersSlice := range bl.ListenersCache {
-               allListenersSlice = append(allListenersSlice, listenersSlice...)
-       }
-       sort.Slice(allListenersSlice, func(i, j int) bool {
-               return allListenersSlice[i].GetPriority() < 
allListenersSlice[j].GetPriority()
-       })
-       return allListenersSlice
-}
-
-// containListener true if contain listener
-// it's not thread safe
-// usually it should be use in lock scope
-func containListener(listenersSlice []EventListener, listener EventListener) 
bool {
-       for _, loadListener := range listenersSlice {
-               if loadListener == listener {
-                       return true
-               }
-       }
-       return false
-}
diff --git a/common/observer/listenable_test.go 
b/common/observer/listenable_test.go
deleted file mode 100644
index 3ae6e5e..0000000
--- a/common/observer/listenable_test.go
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package observer
-
-import (
-       "reflect"
-       "testing"
-)
-
-import (
-       "github.com/stretchr/testify/assert"
-)
-
-func TestListenable(t *testing.T) {
-       el := &TestEventListener{}
-       bl := NewBaseListener()
-       b := &bl
-       b.AddEventListener(el)
-       b.AddEventListener(el)
-       al := b.GetAllEventListeners()
-       assert.Equal(t, len(al), 1)
-       assert.Equal(t, al[0].GetEventType(), reflect.TypeOf(TestEvent{}))
-       b.RemoveEventListener(el)
-       assert.Equal(t, len(b.GetAllEventListeners()), 0)
-       var ts []EventListener
-       ts = append(ts, el)
-       b.AddEventListeners(ts)
-       assert.Equal(t, len(al), 1)
-}
-
-type TestEvent struct {
-       BaseEvent
-}
-
-type TestEventListener struct {
-       EventListener
-}
-
-func (tel *TestEventListener) OnEvent(e Event) error {
-       return nil
-}
-
-func (tel *TestEventListener) GetPriority() int {
-       return -1
-}
-
-func (tel *TestEventListener) GetEventType() reflect.Type {
-       return reflect.TypeOf(TestEvent{})
-}
diff --git a/config/config_loader.go b/config/config_loader.go
index a87abf4..cd4f171 100644
--- a/config/config_loader.go
+++ b/config/config_loader.go
@@ -38,7 +38,6 @@ import (
        "dubbo.apache.org/dubbo-go/v3/common/constant"
        "dubbo.apache.org/dubbo-go/v3/common/extension"
        "dubbo.apache.org/dubbo-go/v3/common/logger"
-       _ "dubbo.apache.org/dubbo-go/v3/common/observer/dispatcher"
        "dubbo.apache.org/dubbo-go/v3/common/yaml"
        "dubbo.apache.org/dubbo-go/v3/registry"
 )
@@ -90,7 +89,7 @@ func DefaultInit() []LoaderInitOption {
        if confRouterFile == "" {
                confRouterFile = constant.DEFAULT_ROUTER_CONF_FILE_PATH
        }
-       return []LoaderInitOption{RouterInitOption(confRouterFile), 
BaseInitOption(""), ConsumerInitOption(confConFile), 
ProviderInitOption(confProFile)}
+       return []LoaderInitOption{RouterInitOption(confRouterFile), 
ConsumerInitOption(confConFile), ProviderInitOption(confProFile)}
 }
 
 // setDefaultValue set default value for providerConfig or consumerConfig if 
it is null
@@ -325,7 +324,7 @@ func createInstance(url *common.URL) 
(registry.ServiceInstance, error) {
        metadata := make(map[string]string, 8)
        metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = 
appConfig.MetadataType
 
-       return &registry.DefaultServiceInstance{
+       instance := &registry.DefaultServiceInstance{
                ServiceName: appConfig.Name,
                Host:        host,
                Port:        int(port),
@@ -333,7 +332,13 @@ func createInstance(url *common.URL) 
(registry.ServiceInstance, error) {
                Enable:      true,
                Healthy:     true,
                Metadata:    metadata,
-       }, nil
+       }
+
+       for _, cus := range extension.GetCustomizers() {
+               cus.Customize(instance)
+       }
+
+       return instance, nil
 }
 
 // selectMetadataServiceExportedURL get already be exported url
diff --git a/config/config_loader_options.go b/config/config_loader_options.go
index bc8aff1..f519bec 100644
--- a/config/config_loader_options.go
+++ b/config/config_loader_options.go
@@ -21,10 +21,6 @@ import (
        "log"
 )
 
-import (
-       "dubbo.apache.org/dubbo-go/v3/common/extension"
-)
-
 type LoaderInitOption interface {
        init()
        apply()
@@ -117,22 +113,3 @@ func RouterInitOption(crf string) LoaderInitOption {
                },
        }
 }
-
-func BaseInitOption(cbf string) LoaderInitOption {
-       return &optionFunc{
-               func() {
-                       if cbf == "" {
-                               return
-                       }
-                       confBaseFile = cbf
-                       if err := BaseInit(cbf); err != nil {
-                               log.Printf("[BaseInit] %#v", err)
-                               baseConfig = nil
-                       }
-               },
-               func() {
-                       // init the global event dispatcher
-                       
extension.SetAndInitGlobalDispatcher(GetBaseConfig().EventDispatcherType)
-               },
-       }
-}
diff --git a/registry/etcdv3/service_discovery.go 
b/registry/etcdv3/service_discovery.go
index 85679f2..a081917 100644
--- a/registry/etcdv3/service_discovery.go
+++ b/registry/etcdv3/service_discovery.go
@@ -65,7 +65,8 @@ type etcdV3ServiceDiscovery struct {
        // services is when register or update will add service name
        services *gxset.HashSet
        // child listener
-       childListenerMap map[string]*etcdv3.EventListener
+       childListenerMap    map[string]*etcdv3.EventListener
+       instanceListenerMap map[string]*gxset.HashSet
 }
 
 // basic information of this instance
@@ -217,28 +218,16 @@ func (e *etcdV3ServiceDiscovery) 
GetRequestInstances(serviceNames []string, offs
 // see addServiceInstancesChangedListener in Java
 func (e *etcdV3ServiceDiscovery) AddListener(listener 
registry.ServiceInstancesChangedListener) error {
        for _, t := range listener.GetServiceNames().Values() {
-               serviceName := t.(string)
-               err := e.registerSreviceWatcher(serviceName)
+               err := e.registerServiceInstanceListener(t.(string), listener)
                if err != nil {
                        return err
                }
-       }
-       return nil
-}
 
-// DispatchEventByServiceName dispatches the ServiceInstancesChangedEvent to 
service instance whose name is serviceName
-func (e *etcdV3ServiceDiscovery) DispatchEventByServiceName(serviceName 
string) error {
-       return e.DispatchEventForInstances(serviceName, 
e.GetInstances(serviceName))
-}
-
-// DispatchEventForInstances dispatches the ServiceInstancesChangedEvent to 
target instances
-func (e *etcdV3ServiceDiscovery) DispatchEventForInstances(serviceName string, 
instances []registry.ServiceInstance) error {
-       return 
e.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, 
instances))
-}
-
-// DispatchEvent dispatches the event
-func (e *etcdV3ServiceDiscovery) DispatchEvent(event 
*registry.ServiceInstancesChangedEvent) error {
-       extension.GetGlobalDispatcher().Dispatch(event)
+               err = e.registerServiceWatcher(t.(string))
+               if err != nil {
+                       return err
+               }
+       }
        return nil
 }
 
@@ -256,8 +245,24 @@ func toParentPath(serviceName string) string {
        return ROOT + constant.PATH_SEPARATOR + serviceName
 }
 
+// register service instance listener, instance listener and watcher are 
matched through serviceName
+func (e *etcdV3ServiceDiscovery) registerServiceInstanceListener(serviceName 
string, listener registry.ServiceInstancesChangedListener) error {
+       initLock.Lock()
+       defer initLock.Unlock()
+
+       set, found := e.instanceListenerMap[serviceName]
+       if !found {
+               set = gxset.NewSet(listener)
+               set.Add(listener)
+               e.instanceListenerMap[serviceName] = set
+               return nil
+       }
+       set.Add(listener)
+       return nil
+}
+
 // register service watcher
-func (e *etcdV3ServiceDiscovery) registerSreviceWatcher(serviceName string) 
error {
+func (e *etcdV3ServiceDiscovery) registerServiceWatcher(serviceName string) 
error {
        initLock.Lock()
        defer initLock.Unlock()
 
@@ -284,7 +289,15 @@ func (e *etcdV3ServiceDiscovery) DataChange(eventType 
remoting.Event) bool {
                        instance.ServiceName = ""
                }
 
-               if err := e.DispatchEventByServiceName(instance.ServiceName); 
err != nil {
+               // notify instance listener instance change
+               name := instance.ServiceName
+               instances := e.GetInstances(name)
+               for _, lis := range 
e.instanceListenerMap[instance.ServiceName].Values() {
+                       var instanceLis registry.ServiceInstancesChangedListener
+                       instanceLis = 
lis.(registry.ServiceInstancesChangedListener)
+                       err = 
instanceLis.OnEvent(registry.NewServiceInstancesChangedEvent(name, instances))
+               }
+               if err != nil {
                        return false
                }
        }
@@ -324,5 +337,11 @@ func newEtcdV3ServiceDiscovery(name string) 
(registry.ServiceDiscovery, error) {
 
        descriptor := fmt.Sprintf("etcd-service-discovery[%s]", 
remoteConfig.Address)
 
-       return &etcdV3ServiceDiscovery{descriptor, client, nil, gxset.NewSet(), 
make(map[string]*etcdv3.EventListener)}, nil
+       return &etcdV3ServiceDiscovery{
+               descriptor,
+               client,
+               nil,
+               gxset.NewSet(),
+               make(map[string]*etcdv3.EventListener),
+               make(map[string]*gxset.HashSet)}, nil
 }
diff --git a/registry/event/customizable_service_instance_listener.go 
b/registry/event/customizable_service_instance_listener.go
deleted file mode 100644
index e38199f..0000000
--- a/registry/event/customizable_service_instance_listener.go
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
-       "reflect"
-       "sync"
-)
-
-import (
-       "dubbo.apache.org/dubbo-go/v3/common/extension"
-       "dubbo.apache.org/dubbo-go/v3/common/observer"
-)
-
-func init() {
-       extension.AddEventListener(GetCustomizableServiceInstanceListener)
-}
-
-// customizableServiceInstanceListener is singleton
-type customizableServiceInstanceListener struct{}
-
-// GetPriority return priority 9999,
-// 9999 is big enough to make sure it will be last invoked
-func (c *customizableServiceInstanceListener) GetPriority() int {
-       return 9999
-}
-
-// OnEvent if the event is ServiceInstancePreRegisteredEvent
-// it will iterate all ServiceInstanceCustomizer instances
-// or it will do nothing
-func (c *customizableServiceInstanceListener) OnEvent(e observer.Event) error {
-       if preRegEvent, ok := e.(*ServiceInstancePreRegisteredEvent); ok {
-               for _, cus := range extension.GetCustomizers() {
-                       cus.Customize(preRegEvent.serviceInstance)
-               }
-       }
-       return nil
-}
-
-// GetEventType will return ServiceInstancePreRegisteredEvent
-func (c *customizableServiceInstanceListener) GetEventType() reflect.Type {
-       return reflect.TypeOf(&ServiceInstancePreRegisteredEvent{})
-}
-
-var (
-       customizableServiceInstanceListenerInstance 
*customizableServiceInstanceListener
-       customizableServiceInstanceListenerOnce     sync.Once
-)
-
-// GetCustomizableServiceInstanceListener returns an instance
-// if the instance was not initialized, we create one
-func GetCustomizableServiceInstanceListener() observer.EventListener {
-       customizableServiceInstanceListenerOnce.Do(func() {
-               customizableServiceInstanceListenerInstance = 
&customizableServiceInstanceListener{}
-       })
-       return customizableServiceInstanceListenerInstance
-}
diff --git a/registry/event/customizable_service_instance_listener_test.go 
b/registry/event/customizable_service_instance_listener_test.go
deleted file mode 100644
index 963f0aa..0000000
--- a/registry/event/customizable_service_instance_listener_test.go
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
-       "testing"
-       "time"
-)
-
-import (
-       "github.com/stretchr/testify/assert"
-)
-
-import (
-       "dubbo.apache.org/dubbo-go/v3/common/extension"
-       "dubbo.apache.org/dubbo-go/v3/registry"
-)
-
-func TestGetCustomizableServiceInstanceListener(t *testing.T) {
-
-       cus := GetCustomizableServiceInstanceListener()
-
-       assert.Equal(t, 9999, cus.GetPriority())
-
-       extension.AddCustomizers(&mockCustomizer{})
-
-       err := cus.OnEvent(&mockEvent{})
-       assert.Nil(t, err)
-       err = cus.OnEvent(NewServiceInstancePreRegisteredEvent("hello", 
createInstance()))
-       assert.Nil(t, err)
-
-       tp := cus.GetEventType()
-       assert.NotNil(t, tp)
-}
-
-type mockEvent struct{}
-
-func (m *mockEvent) String() string {
-       panic("implement me")
-}
-
-func (m *mockEvent) GetSource() interface{} {
-       panic("implement me")
-}
-
-func (m *mockEvent) GetTimestamp() time.Time {
-       panic("implement me")
-}
-
-type mockCustomizer struct{}
-
-func (m *mockCustomizer) GetPriority() int {
-       return 0
-}
-
-func (m *mockCustomizer) Customize(instance registry.ServiceInstance) {
-}
diff --git a/registry/event/event_publishing_service_deiscovery_test.go 
b/registry/event/event_publishing_service_deiscovery_test.go
deleted file mode 100644
index 13983b1..0000000
--- a/registry/event/event_publishing_service_deiscovery_test.go
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
-       "reflect"
-       "testing"
-)
-
-import (
-       gxset "github.com/dubbogo/gost/container/set"
-       gxpage "github.com/dubbogo/gost/hash/page"
-       "github.com/stretchr/testify/assert"
-       "github.com/stretchr/testify/suite"
-)
-
-import (
-       "dubbo.apache.org/dubbo-go/v3/common/extension"
-       "dubbo.apache.org/dubbo-go/v3/common/observer"
-       "dubbo.apache.org/dubbo-go/v3/common/observer/dispatcher"
-       "dubbo.apache.org/dubbo-go/v3/config"
-       "dubbo.apache.org/dubbo-go/v3/metadata/mapping"
-       _ "dubbo.apache.org/dubbo-go/v3/metadata/service/local"
-       "dubbo.apache.org/dubbo-go/v3/registry"
-)
-
-func TestEventPublishingServiceDiscovery_DispatchEvent(t *testing.T) {
-       // extension.SetMetadataService("local", local.NewMetadataService)
-
-       config.GetApplicationConfig().MetadataType = "local"
-
-       extension.SetGlobalServiceNameMapping(func() mapping.ServiceNameMapping 
{
-               return mapping.NewMockServiceNameMapping()
-       })
-
-       dc := NewEventPublishingServiceDiscovery(&ServiceDiscoveryA{})
-       tsd := &TestServiceDiscoveryDestroyingEventListener{
-               BaseListener: observer.NewBaseListener(),
-       }
-       tsd.SetT(t)
-       tsi := &TestServiceInstancePreRegisteredEventListener{}
-       tsi.SetT(t)
-       extension.AddEventListener(func() observer.EventListener {
-               return tsd
-       })
-       extension.AddEventListener(func() observer.EventListener {
-               return tsi
-       })
-       extension.SetEventDispatcher("direct", 
dispatcher.NewDirectEventDispatcher)
-       extension.SetAndInitGlobalDispatcher("direct")
-       err := dc.Destroy()
-       assert.Nil(t, err)
-       si := &registry.DefaultServiceInstance{ID: "testServiceInstance"}
-       err = dc.Register(si)
-       assert.Nil(t, err)
-}
-
-type TestServiceDiscoveryDestroyingEventListener struct {
-       suite.Suite
-       observer.BaseListener
-}
-
-func (tel *TestServiceDiscoveryDestroyingEventListener) OnEvent(e 
observer.Event) error {
-       e1, ok := e.(*ServiceDiscoveryDestroyingEvent)
-       assert.Equal(tel.T(), ok, true)
-       assert.Equal(tel.T(), "testServiceDiscovery", e1.GetOriginal().String())
-       assert.Equal(tel.T(), "testServiceDiscovery", 
e1.GetServiceDiscovery().String())
-       return nil
-}
-
-func (tel *TestServiceDiscoveryDestroyingEventListener) GetPriority() int {
-       return -1
-}
-
-func (tel *TestServiceDiscoveryDestroyingEventListener) GetEventType() 
reflect.Type {
-       return reflect.TypeOf(ServiceDiscoveryDestroyingEvent{})
-}
-
-type TestServiceInstancePreRegisteredEventListener struct {
-       suite.Suite
-       observer.BaseListener
-}
-
-func (tel *TestServiceInstancePreRegisteredEventListener) OnEvent(e 
observer.Event) error {
-       e1, ok := e.(*ServiceInstancePreRegisteredEvent)
-       assert.Equal(tel.T(), ok, true)
-       assert.Equal(tel.T(), "testServiceInstance", 
e1.getServiceInstance().GetID())
-       return nil
-}
-
-func (tel *TestServiceInstancePreRegisteredEventListener) GetPriority() int {
-       return -1
-}
-
-func (tel *TestServiceInstancePreRegisteredEventListener) GetEventType() 
reflect.Type {
-       return reflect.TypeOf(ServiceInstancePreRegisteredEvent{})
-}
-
-type ServiceDiscoveryA struct{}
-
-// String return mockServiceDiscovery
-func (msd *ServiceDiscoveryA) String() string {
-       return "testServiceDiscovery"
-}
-
-// Destroy do nothing
-func (msd *ServiceDiscoveryA) Destroy() error {
-       return nil
-}
-
-func (msd *ServiceDiscoveryA) Register(instance registry.ServiceInstance) 
error {
-       return nil
-}
-
-func (msd *ServiceDiscoveryA) Update(instance registry.ServiceInstance) error {
-       return nil
-}
-
-func (msd *ServiceDiscoveryA) Unregister(instance registry.ServiceInstance) 
error {
-       return nil
-}
-
-func (msd *ServiceDiscoveryA) GetDefaultPageSize() int {
-       return 1
-}
-
-func (msd *ServiceDiscoveryA) GetServices() *gxset.HashSet {
-       return nil
-}
-
-func (msd *ServiceDiscoveryA) GetInstances(serviceName string) 
[]registry.ServiceInstance {
-       return nil
-}
-
-func (msd *ServiceDiscoveryA) GetInstancesByPage(serviceName string, offset 
int, pageSize int) gxpage.Pager {
-       return nil
-}
-
-func (msd *ServiceDiscoveryA) GetHealthyInstancesByPage(serviceName string, 
offset int, pageSize int, healthy bool) gxpage.Pager {
-       return nil
-}
-
-func (msd *ServiceDiscoveryA) GetRequestInstances(serviceNames []string, 
offset int, requestedSize int) map[string]gxpage.Pager {
-       return nil
-}
-
-func (msd *ServiceDiscoveryA) AddListener(listener 
registry.ServiceInstancesChangedListener) error {
-       return nil
-}
-
-func (msd *ServiceDiscoveryA) DispatchEventByServiceName(serviceName string) 
error {
-       return nil
-}
-
-func (msd *ServiceDiscoveryA) DispatchEventForInstances(serviceName string, 
instances []registry.ServiceInstance) error {
-       return nil
-}
-
-func (msd *ServiceDiscoveryA) DispatchEvent(event 
*registry.ServiceInstancesChangedEvent) error {
-       return nil
-}
diff --git a/registry/event/event_publishing_service_discovery.go 
b/registry/event/event_publishing_service_discovery.go
deleted file mode 100644
index b360dab..0000000
--- a/registry/event/event_publishing_service_discovery.go
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
-       gxset "github.com/dubbogo/gost/container/set"
-       gxpage "github.com/dubbogo/gost/hash/page"
-)
-
-import (
-       "dubbo.apache.org/dubbo-go/v3/common/extension"
-       "dubbo.apache.org/dubbo-go/v3/common/observer"
-       "dubbo.apache.org/dubbo-go/v3/metadata/service"
-       "dubbo.apache.org/dubbo-go/v3/metadata/service/local"
-       "dubbo.apache.org/dubbo-go/v3/registry"
-)
-
-// EventPublishingServiceDiscovery will enhance Service Discovery
-// Publish some event about service discovery
-type EventPublishingServiceDiscovery struct {
-       serviceDiscovery registry.ServiceDiscovery
-}
-
-// NewEventPublishingServiceDiscovery is a constructor
-func NewEventPublishingServiceDiscovery(serviceDiscovery 
registry.ServiceDiscovery) *EventPublishingServiceDiscovery {
-       return &EventPublishingServiceDiscovery{
-               serviceDiscovery: serviceDiscovery,
-       }
-}
-
-// String returns serviceDiscovery.String()
-func (epsd *EventPublishingServiceDiscovery) String() string {
-       return epsd.serviceDiscovery.String()
-}
-
-// Destroy delegate function
-func (epsd *EventPublishingServiceDiscovery) Destroy() error {
-       f := func() error {
-               return epsd.serviceDiscovery.Destroy()
-       }
-       return epsd.executeWithEvents(NewServiceDiscoveryDestroyingEvent(epsd, 
epsd.serviceDiscovery),
-               f, NewServiceDiscoveryDestroyedEvent(epsd, 
epsd.serviceDiscovery))
-}
-
-// Register delegate function
-func (epsd *EventPublishingServiceDiscovery) Register(instance 
registry.ServiceInstance) error {
-       f := func() error {
-               return epsd.serviceDiscovery.Register(instance)
-       }
-       return 
epsd.executeWithEvents(NewServiceInstancePreRegisteredEvent(epsd.serviceDiscovery,
 instance),
-               f, NewServiceInstanceRegisteredEvent(epsd.serviceDiscovery, 
instance))
-}
-
-// Update returns the result of serviceDiscovery.Update
-func (epsd *EventPublishingServiceDiscovery) Update(instance 
registry.ServiceInstance) error {
-       f := func() error {
-               return epsd.serviceDiscovery.Update(instance)
-       }
-       return epsd.executeWithEvents(nil, f, nil)
-}
-
-// Unregister unregister the instance and drop 
ServiceInstancePreUnregisteredEvent and ServiceInstanceUnregisteredEvent
-func (epsd *EventPublishingServiceDiscovery) Unregister(instance 
registry.ServiceInstance) error {
-       f := func() error {
-               return epsd.serviceDiscovery.Unregister(instance)
-       }
-       return 
epsd.executeWithEvents(NewServiceInstancePreUnregisteredEvent(epsd.serviceDiscovery,
 instance),
-               f, NewServiceInstanceUnregisteredEvent(epsd.serviceDiscovery, 
instance))
-}
-
-// GetDefaultPageSize returns the result of serviceDiscovery.GetDefaultPageSize
-func (epsd *EventPublishingServiceDiscovery) GetDefaultPageSize() int {
-       return epsd.serviceDiscovery.GetDefaultPageSize()
-}
-
-// GetServices returns the result of serviceDiscovery.GetServices
-func (epsd *EventPublishingServiceDiscovery) GetServices() *gxset.HashSet {
-       return epsd.serviceDiscovery.GetServices()
-}
-
-// GetInstances returns the result of serviceDiscovery.GetInstances
-func (epsd *EventPublishingServiceDiscovery) GetInstances(serviceName string) 
[]registry.ServiceInstance {
-       return epsd.serviceDiscovery.GetInstances(serviceName)
-}
-
-// GetInstancesByPage returns the result of serviceDiscovery.GetInstancesByPage
-func (epsd *EventPublishingServiceDiscovery) GetInstancesByPage(serviceName 
string, offset int, pageSize int) gxpage.Pager {
-       return epsd.serviceDiscovery.GetInstancesByPage(serviceName, offset, 
pageSize)
-}
-
-// GetHealthyInstancesByPage returns the result of 
serviceDiscovery.GetHealthyInstancesByPage
-func (epsd *EventPublishingServiceDiscovery) 
GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy 
bool) gxpage.Pager {
-       return epsd.serviceDiscovery.GetHealthyInstancesByPage(serviceName, 
offset, pageSize, healthy)
-}
-
-// GetRequestInstances returns result from serviceDiscovery.GetRequestInstances
-func (epsd *EventPublishingServiceDiscovery) GetRequestInstances(serviceNames 
[]string, offset int, requestedSize int) map[string]gxpage.Pager {
-       return epsd.serviceDiscovery.GetRequestInstances(serviceNames, offset, 
requestedSize)
-}
-
-// AddListener add event listener
-func (epsd *EventPublishingServiceDiscovery) AddListener(listener 
registry.ServiceInstancesChangedListener) error {
-       extension.GetGlobalDispatcher().AddEventListener(listener)
-       return epsd.serviceDiscovery.AddListener(listener)
-}
-
-// DispatchEventByServiceName pass serviceName to serviceDiscovery
-func (epsd *EventPublishingServiceDiscovery) 
DispatchEventByServiceName(serviceName string) error {
-       return epsd.serviceDiscovery.DispatchEventByServiceName(serviceName)
-}
-
-// DispatchEventForInstances pass params to serviceDiscovery
-func (epsd *EventPublishingServiceDiscovery) 
DispatchEventForInstances(serviceName string, instances 
[]registry.ServiceInstance) error {
-       return epsd.serviceDiscovery.DispatchEventForInstances(serviceName, 
instances)
-}
-
-// DispatchEvent pass the event to serviceDiscovery
-func (epsd *EventPublishingServiceDiscovery) DispatchEvent(event 
*registry.ServiceInstancesChangedEvent) error {
-       return epsd.serviceDiscovery.DispatchEvent(event)
-}
-
-// executeWithEvents dispatch before event and after event if return error 
will dispatch exception event
-func (epsd *EventPublishingServiceDiscovery) executeWithEvents(beforeEvent 
observer.Event, f func() error, afterEvent observer.Event) error {
-       globalDispatcher := extension.GetGlobalDispatcher()
-       if beforeEvent != nil {
-               globalDispatcher.Dispatch(beforeEvent)
-       }
-       if err := f(); err != nil {
-               
globalDispatcher.Dispatch(NewServiceDiscoveryExceptionEvent(epsd, 
epsd.serviceDiscovery, err))
-               return err
-       }
-       if afterEvent != nil {
-               globalDispatcher.Dispatch(afterEvent)
-       }
-       return nil
-}
-
-// getMetadataService returns metadata service instance
-func getMetadataService() (service.MetadataService, error) {
-       return local.GetLocalMetadataService()
-}
diff --git a/registry/event/log_event_listener.go 
b/registry/event/log_event_listener.go
deleted file mode 100644
index 9ccfcdc..0000000
--- a/registry/event/log_event_listener.go
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
-       "reflect"
-       "sync"
-)
-
-import (
-       "dubbo.apache.org/dubbo-go/v3/common/extension"
-       "dubbo.apache.org/dubbo-go/v3/common/logger"
-       "dubbo.apache.org/dubbo-go/v3/common/observer"
-)
-
-func init() {
-       extension.AddEventListener(GetLogEventListener)
-}
-
-// logEventListener is singleton
-type logEventListener struct{}
-
-func (l *logEventListener) GetPriority() int {
-       return 0
-}
-
-func (l *logEventListener) OnEvent(e observer.Event) error {
-       logger.Info("Event happen: " + e.String())
-       return nil
-}
-
-func (l *logEventListener) GetEventType() reflect.Type {
-       return reflect.TypeOf(&observer.BaseEvent{})
-}
-
-var (
-       logEventListenerInstance *logEventListener
-       logEventListenerOnce     sync.Once
-)
-
-func GetLogEventListener() observer.EventListener {
-       logEventListenerOnce.Do(func() {
-               logEventListenerInstance = &logEventListener{}
-       })
-       return logEventListenerInstance
-}
diff --git a/registry/event/log_event_listener_test.go 
b/registry/event/log_event_listener_test.go
deleted file mode 100644
index 3136564..0000000
--- a/registry/event/log_event_listener_test.go
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
-       "testing"
-)
-
-import (
-       "github.com/stretchr/testify/assert"
-)
-
-func TestLogEventListener(t *testing.T) {
-       l := &logEventListener{}
-       assert.Equal(t, 0, l.GetPriority())
-       assert.Nil(t, l.OnEvent(&ServiceDiscoveryDestroyedEvent{}))
-}
diff --git a/registry/event/metadata_service_url_params_customizer.go 
b/registry/event/metadata_service_url_params_customizer.go
index bfe7fb7..53e79b4 100644
--- a/registry/event/metadata_service_url_params_customizer.go
+++ b/registry/event/metadata_service_url_params_customizer.go
@@ -30,6 +30,7 @@ import (
        "dubbo.apache.org/dubbo-go/v3/common/constant"
        "dubbo.apache.org/dubbo-go/v3/common/extension"
        "dubbo.apache.org/dubbo-go/v3/common/logger"
+       "dubbo.apache.org/dubbo-go/v3/metadata/service/local"
        "dubbo.apache.org/dubbo-go/v3/registry"
 )
 
@@ -54,7 +55,7 @@ func (m *metadataServiceURLParamsMetadataCustomizer) 
GetPriority() int {
 }
 
 func (m *metadataServiceURLParamsMetadataCustomizer) Customize(instance 
registry.ServiceInstance) {
-       ms, err := getMetadataService()
+       ms, err := local.GetLocalMetadataService()
        if err != nil {
                logger.Errorf("could not find the metadata service", err)
                return
diff --git a/registry/event/protocol_ports_metadata_customizer.go 
b/registry/event/protocol_ports_metadata_customizer.go
index 5a6e9ca..058c0b9 100644
--- a/registry/event/protocol_ports_metadata_customizer.go
+++ b/registry/event/protocol_ports_metadata_customizer.go
@@ -26,6 +26,7 @@ import (
        "dubbo.apache.org/dubbo-go/v3/common/constant"
        "dubbo.apache.org/dubbo-go/v3/common/extension"
        "dubbo.apache.org/dubbo-go/v3/common/logger"
+       "dubbo.apache.org/dubbo-go/v3/metadata/service/local"
        "dubbo.apache.org/dubbo-go/v3/registry"
 )
 
@@ -44,7 +45,7 @@ func (p *ProtocolPortsMetadataCustomizer) GetPriority() int {
 
 // Customize put the the string like [{"protocol": "dubbo", "port": 123}] into 
instance's metadata
 func (p *ProtocolPortsMetadataCustomizer) Customize(instance 
registry.ServiceInstance) {
-       metadataService, err := getMetadataService()
+       metadataService, err := local.GetLocalMetadataService()
        if err != nil {
                logger.Errorf("Could not init the MetadataService", err)
                return
diff --git a/registry/event/service_config_exported_event.go 
b/registry/event/service_config_exported_event.go
deleted file mode 100644
index af566e3..0000000
--- a/registry/event/service_config_exported_event.go
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
-       "time"
-)
-
-import (
-       "dubbo.apache.org/dubbo-go/v3/common/observer"
-       "dubbo.apache.org/dubbo-go/v3/config"
-)
-
-// ServiceConfigExportedEvent represents an service was exported
-type ServiceConfigExportedEvent struct {
-       observer.BaseEvent
-       ServiceConfig *config.ServiceConfig
-}
-
-// NewServiceConfigExportedEvent create an instance
-func NewServiceConfigExportedEvent(serviceConfig *config.ServiceConfig) 
*ServiceConfigExportedEvent {
-       return &ServiceConfigExportedEvent{
-               BaseEvent: observer.BaseEvent{
-                       Source:    serviceConfig,
-                       Timestamp: time.Now(),
-               },
-               ServiceConfig: serviceConfig,
-       }
-}
diff --git a/registry/event/service_discovery_event.go 
b/registry/event/service_discovery_event.go
deleted file mode 100644
index 8235294..0000000
--- a/registry/event/service_discovery_event.go
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
-       "dubbo.apache.org/dubbo-go/v3/common/observer"
-       "dubbo.apache.org/dubbo-go/v3/registry"
-)
-
-// ServiceDiscoveryEvent means that something happens to service discovery 
instance
-type ServiceDiscoveryEvent struct {
-       observer.BaseEvent
-       original registry.ServiceDiscovery
-}
-
-// NewServiceDiscoveryEvent returns an instance
-func NewServiceDiscoveryEvent(discovery registry.ServiceDiscovery, original 
registry.ServiceDiscovery) *ServiceDiscoveryEvent {
-       return &ServiceDiscoveryEvent{
-               BaseEvent: *observer.NewBaseEvent(discovery),
-               original:  original,
-       }
-}
-
-// GetServiceDiscovery returns the event source
-func (sde *ServiceDiscoveryEvent) GetServiceDiscovery() 
registry.ServiceDiscovery {
-       return sde.GetSource().(registry.ServiceDiscovery)
-}
-
-// GetOriginal actually I think we can remove this method.
-func (sde *ServiceDiscoveryEvent) GetOriginal() registry.ServiceDiscovery {
-       return sde.original
-}
-
-// ServiceDiscoveryDestroyingEvent
-// this event will be dispatched before service discovery be destroyed
-type ServiceDiscoveryDestroyingEvent struct {
-       ServiceDiscoveryEvent
-}
-
-// ServiceDiscoveryExceptionEvent
-// this event will be dispatched when the error occur in service discovery
-type ServiceDiscoveryExceptionEvent struct {
-       ServiceDiscoveryEvent
-       err error
-}
-
-// ServiceDiscoveryInitializedEvent
-// this event will be dispatched after service discovery initialize
-type ServiceDiscoveryInitializedEvent struct {
-       ServiceDiscoveryEvent
-}
-
-// ServiceDiscoveryInitializingEvent
-// this event will be dispatched before service discovery initialize
-type ServiceDiscoveryInitializingEvent struct {
-       ServiceDiscoveryEvent
-}
-
-// ServiceDiscoveryDestroyedEvent
-// this event will be dispatched after service discovery be destroyed
-type ServiceDiscoveryDestroyedEvent struct {
-       ServiceDiscoveryEvent
-}
-
-// NewServiceDiscoveryDestroyingEvent create a ServiceDiscoveryDestroyingEvent
-func NewServiceDiscoveryDestroyingEvent(discovery registry.ServiceDiscovery, 
original registry.ServiceDiscovery) *ServiceDiscoveryDestroyingEvent {
-       return 
&ServiceDiscoveryDestroyingEvent{*NewServiceDiscoveryEvent(discovery, original)}
-}
-
-// NewServiceDiscoveryExceptionEvent create a ServiceDiscoveryExceptionEvent
-func NewServiceDiscoveryExceptionEvent(discovery registry.ServiceDiscovery, 
original registry.ServiceDiscovery, err error) *ServiceDiscoveryExceptionEvent {
-       return 
&ServiceDiscoveryExceptionEvent{*NewServiceDiscoveryEvent(discovery, original), 
err}
-}
-
-// NewServiceDiscoveryInitializedEvent create a 
ServiceDiscoveryInitializedEvent
-func NewServiceDiscoveryInitializedEvent(discovery registry.ServiceDiscovery, 
original registry.ServiceDiscovery) *ServiceDiscoveryInitializedEvent {
-       return 
&ServiceDiscoveryInitializedEvent{*NewServiceDiscoveryEvent(discovery, 
original)}
-}
-
-// NewServiceDiscoveryInitializingEvent create a 
ServiceDiscoveryInitializingEvent
-func NewServiceDiscoveryInitializingEvent(discovery registry.ServiceDiscovery, 
original registry.ServiceDiscovery) *ServiceDiscoveryInitializingEvent {
-       return 
&ServiceDiscoveryInitializingEvent{*NewServiceDiscoveryEvent(discovery, 
original)}
-}
-
-// NewServiceDiscoveryDestroyedEvent create a ServiceDiscoveryDestroyedEvent
-func NewServiceDiscoveryDestroyedEvent(discovery registry.ServiceDiscovery, 
original registry.ServiceDiscovery) *ServiceDiscoveryDestroyedEvent {
-       return 
&ServiceDiscoveryDestroyedEvent{*NewServiceDiscoveryEvent(discovery, original)}
-}
diff --git a/registry/event/service_instance_event.go 
b/registry/event/service_instance_event.go
deleted file mode 100644
index bac8029..0000000
--- a/registry/event/service_instance_event.go
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
-       "dubbo.apache.org/dubbo-go/v3/common/observer"
-       "dubbo.apache.org/dubbo-go/v3/registry"
-)
-
-// ServiceInstanceEvent means something happen to this ServiceInstance
-// like register this service instance
-type ServiceInstanceEvent struct {
-       observer.BaseEvent
-       serviceInstance registry.ServiceInstance
-}
-
-// NewServiceInstanceEvent create a ServiceInstanceEvent
-func NewServiceInstanceEvent(source interface{}, instance 
registry.ServiceInstance) *ServiceInstanceEvent {
-       return &ServiceInstanceEvent{
-               BaseEvent:       *observer.NewBaseEvent(source),
-               serviceInstance: instance,
-       }
-}
-
-// getServiceInstance return the service instance
-func (sie *ServiceInstanceEvent) getServiceInstance() registry.ServiceInstance 
{
-       return sie.serviceInstance
-}
-
-// ServiceInstancePreRegisteredEvent
-// this event will be dispatched before service instance be registered
-type ServiceInstancePreRegisteredEvent struct {
-       ServiceInstanceEvent
-}
-
-// ServiceInstancePreUnregisteredEvent
-// this event will be dispatched before service instance be unregistered
-type ServiceInstancePreUnregisteredEvent struct {
-       ServiceInstanceEvent
-}
-
-// ServiceInstanceRegisteredEvent
-// this event will be dispatched after service instance be registered
-type ServiceInstanceRegisteredEvent struct {
-       ServiceInstanceEvent
-}
-
-// ServiceInstanceRegisteredEvent
-// this event will be dispatched after service instance be unregistered
-type ServiceInstanceUnregisteredEvent struct {
-       ServiceInstanceEvent
-}
-
-// NewServiceInstancePreRegisteredEvent create a 
ServiceInstancePreRegisteredEvent
-func NewServiceInstancePreRegisteredEvent(source interface{}, instance 
registry.ServiceInstance) *ServiceInstancePreRegisteredEvent {
-       return 
&ServiceInstancePreRegisteredEvent{*NewServiceInstanceEvent(source, instance)}
-}
-
-// NewServiceInstancePreUnregisteredEvent create a 
ServiceInstancePreUnregisteredEvent
-func NewServiceInstancePreUnregisteredEvent(source interface{}, instance 
registry.ServiceInstance) *ServiceInstancePreUnregisteredEvent {
-       return 
&ServiceInstancePreUnregisteredEvent{*NewServiceInstanceEvent(source, instance)}
-}
-
-// NewServiceInstanceRegisteredEvent create a ServiceInstanceRegisteredEvent
-func NewServiceInstanceRegisteredEvent(source interface{}, instance 
registry.ServiceInstance) *ServiceInstanceRegisteredEvent {
-       return &ServiceInstanceRegisteredEvent{*NewServiceInstanceEvent(source, 
instance)}
-}
-
-// NewServiceInstanceUnregisteredEvent create a 
ServiceInstanceUnregisteredEvent
-func NewServiceInstanceUnregisteredEvent(source interface{}, instance 
registry.ServiceInstance) *ServiceInstanceUnregisteredEvent {
-       return 
&ServiceInstanceUnregisteredEvent{*NewServiceInstanceEvent(source, instance)}
-}
diff --git a/registry/event/service_name_mapping_listener.go 
b/registry/event/service_name_mapping_listener.go
deleted file mode 100644
index e7d979a..0000000
--- a/registry/event/service_name_mapping_listener.go
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package event
-
-import (
-       "reflect"
-       "sync"
-)
-
-import (
-       perrors "github.com/pkg/errors"
-)
-
-import (
-       "dubbo.apache.org/dubbo-go/v3/common/constant"
-       "dubbo.apache.org/dubbo-go/v3/common/extension"
-       "dubbo.apache.org/dubbo-go/v3/common/observer"
-       "dubbo.apache.org/dubbo-go/v3/metadata/mapping"
-)
-
-func init() {
-       extension.AddEventListener(GetServiceNameMappingListener)
-}
-
-// serviceNameMappingListener listen to service name mapping event
-// usually it means that we exported some service
-// it's a singleton
-type serviceNameMappingListener struct {
-       nameMapping mapping.ServiceNameMapping
-}
-
-// GetPriority return 3, which ensure that this listener will be invoked after 
log listener
-func (s *serviceNameMappingListener) GetPriority() int {
-       return 3
-}
-
-// OnEvent only handle ServiceConfigExportedEvent
-func (s *serviceNameMappingListener) OnEvent(e observer.Event) error {
-       if ex, ok := e.(*ServiceConfigExportedEvent); ok {
-               sc := ex.ServiceConfig
-               urls := sc.GetExportedUrls()
-
-               for _, u := range urls {
-                       err := 
s.nameMapping.Map(u.GetParam(constant.INTERFACE_KEY, ""),
-                               u.GetParam(constant.GROUP_KEY, ""),
-                               u.GetParam(constant.Version, ""),
-                               u.Protocol)
-                       if err != nil {
-                               return perrors.WithMessage(err, "could not map 
the service: "+u.String())
-                       }
-               }
-       }
-       return nil
-}
-
-// GetEventType return ServiceConfigExportedEvent
-func (s *serviceNameMappingListener) GetEventType() reflect.Type {
-       return reflect.TypeOf(&ServiceConfigExportedEvent{})
-}
-
-var (
-       serviceNameMappingListenerInstance *serviceNameMappingListener
-       serviceNameMappingListenerOnce     sync.Once
-)
-
-// GetServiceNameMappingListener returns an instance
-func GetServiceNameMappingListener() observer.EventListener {
-       serviceNameMappingListenerOnce.Do(func() {
-               serviceNameMappingListenerInstance = 
&serviceNameMappingListener{
-                       nameMapping: extension.GetGlobalServiceNameMapping(),
-               }
-       })
-       return serviceNameMappingListenerInstance
-}
diff --git a/registry/event/service_revision_customizer.go 
b/registry/event/service_revision_customizer.go
index 3acf655..77e022c 100644
--- a/registry/event/service_revision_customizer.go
+++ b/registry/event/service_revision_customizer.go
@@ -28,6 +28,7 @@ import (
        "dubbo.apache.org/dubbo-go/v3/common/constant"
        "dubbo.apache.org/dubbo-go/v3/common/extension"
        "dubbo.apache.org/dubbo-go/v3/common/logger"
+       "dubbo.apache.org/dubbo-go/v3/metadata/service/local"
        "dubbo.apache.org/dubbo-go/v3/registry"
 )
 
@@ -47,7 +48,7 @@ func (e *exportedServicesRevisionMetadataCustomizer) 
GetPriority() int {
 
 // Customize calculate the revision for exported urls and then put it into 
instance metadata
 func (e *exportedServicesRevisionMetadataCustomizer) Customize(instance 
registry.ServiceInstance) {
-       ms, err := getMetadataService()
+       ms, err := local.GetLocalMetadataService()
        if err != nil {
                logger.Errorf("could not get metadata service", err)
                return
@@ -74,7 +75,7 @@ func (e *subscribedServicesRevisionMetadataCustomizer) 
GetPriority() int {
 
 // Customize calculate the revision for subscribed urls and then put it into 
instance metadata
 func (e *subscribedServicesRevisionMetadataCustomizer) Customize(instance 
registry.ServiceInstance) {
-       ms, err := getMetadataService()
+       ms, err := local.GetLocalMetadataService()
        if err != nil {
                logger.Errorf("could not get metadata service", err)
                return
diff --git a/registry/file/service_discovery.go 
b/registry/file/service_discovery.go
index b6c7a31..015a45e 100644
--- a/registry/file/service_discovery.go
+++ b/registry/file/service_discovery.go
@@ -268,20 +268,3 @@ func (fssd *fileSystemServiceDiscovery) 
AddListener(listener registry.ServiceIns
        // fssd.dynamicConfiguration.AddListener(listener.ServiceName)
        return nil
 }
-
-// DispatchEventByServiceName dispatches the ServiceInstancesChangedEvent to 
service instance whose name is serviceName
-func (fssd *fileSystemServiceDiscovery) DispatchEventByServiceName(serviceName 
string) error {
-       return 
fssd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, 
fssd.GetInstances(serviceName)))
-}
-
-// DispatchEventForInstances dispatches the ServiceInstancesChangedEvent to 
target instances
-func (fssd *fileSystemServiceDiscovery) DispatchEventForInstances(serviceName 
string,
-       instances []registry.ServiceInstance) error {
-       return 
fssd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, 
instances))
-}
-
-// DispatchEvent dispatches the event
-func (fssd *fileSystemServiceDiscovery) DispatchEvent(event 
*registry.ServiceInstancesChangedEvent) error {
-       extension.GetGlobalDispatcher().Dispatch(event)
-       return nil
-}
diff --git a/registry/nacos/service_discovery.go 
b/registry/nacos/service_discovery.go
index 57d4bec..2328b4b 100644
--- a/registry/nacos/service_discovery.go
+++ b/registry/nacos/service_discovery.go
@@ -62,6 +62,9 @@ type nacosServiceDiscovery struct {
        namingClient *nacosClient.NacosNamingClient
        // cache registry instances
        registryInstances []registry.ServiceInstance
+
+       instanceListenerMap map[string]*gxset.HashSet
+       listenerLock        sync.Mutex
 }
 
 // Destroy will close the service discovery.
@@ -210,8 +213,30 @@ func (n *nacosServiceDiscovery) 
GetRequestInstances(serviceNames []string, offse
        return res
 }
 
+func (n *nacosServiceDiscovery) registerInstanceListener(listener 
registry.ServiceInstancesChangedListener) {
+       n.listenerLock.Lock()
+       defer n.listenerLock.Unlock()
+
+       for _, t := range listener.GetServiceNames().Values() {
+               serviceName, ok := t.(string)
+               if !ok {
+                       logger.Errorf("service name error %s", t)
+                       continue
+               }
+               listenerSet, found := n.instanceListenerMap[serviceName]
+               if !found {
+                       listenerSet = gxset.NewSet(listener)
+                       listenerSet.Add(listener)
+                       n.instanceListenerMap[serviceName] = listenerSet
+               } else {
+                       listenerSet.Add(listener)
+               }
+       }
+}
+
 // AddListener will add a listener
 func (n *nacosServiceDiscovery) AddListener(listener 
registry.ServiceInstancesChangedListener) error {
+       n.registerInstanceListener(listener)
        for _, t := range listener.GetServiceNames().Values() {
                serviceName := t.(string)
                err := n.namingClient.Client().Subscribe(&vo.SubscribeParam{
@@ -241,7 +266,13 @@ func (n *nacosServiceDiscovery) AddListener(listener 
registry.ServiceInstancesCh
                                        })
                                }
 
-                               e := n.DispatchEventForInstances(serviceName, 
instances)
+                               var e error
+                               for _, lis := range 
n.instanceListenerMap[serviceName].Values() {
+                                       var instanceListener 
registry.ServiceInstancesChangedListener
+                                       instanceListener = 
lis.(registry.ServiceInstancesChangedListener)
+                                       e = 
instanceListener.OnEvent(registry.NewServiceInstancesChangedEvent(serviceName, 
instances))
+                               }
+
                                if e != nil {
                                        logger.Errorf("Dispatching event got 
exception, service name: %s, err: %v", serviceName, err)
                                }
@@ -254,22 +285,6 @@ func (n *nacosServiceDiscovery) AddListener(listener 
registry.ServiceInstancesCh
        return nil
 }
 
-// DispatchEventByServiceName will dispatch the event for the service with the 
service name
-func (n *nacosServiceDiscovery) DispatchEventByServiceName(serviceName string) 
error {
-       return n.DispatchEventForInstances(serviceName, 
n.GetInstances(serviceName))
-}
-
-// DispatchEventForInstances will dispatch the event to those instances
-func (n *nacosServiceDiscovery) DispatchEventForInstances(serviceName string, 
instances []registry.ServiceInstance) error {
-       return 
n.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, 
instances))
-}
-
-// DispatchEvent will dispatch the event
-func (n *nacosServiceDiscovery) DispatchEvent(event 
*registry.ServiceInstancesChangedEvent) error {
-       extension.GetGlobalDispatcher().Dispatch(event)
-       return nil
-}
-
 // toRegisterInstance convert the ServiceInstance to RegisterInstanceParam
 // the Ephemeral will be true
 func (n *nacosServiceDiscovery) toRegisterInstance(instance 
registry.ServiceInstance) vo.RegisterInstanceParam {
@@ -347,10 +362,11 @@ func newNacosServiceDiscovery(name string) 
(registry.ServiceDiscovery, error) {
        descriptor := fmt.Sprintf("nacos-service-discovery[%s]", rc.Address)
 
        newInstance := &nacosServiceDiscovery{
-               group:             group,
-               namingClient:      client,
-               descriptor:        descriptor,
-               registryInstances: []registry.ServiceInstance{},
+               group:               group,
+               namingClient:        client,
+               descriptor:          descriptor,
+               registryInstances:   []registry.ServiceInstance{},
+               instanceListenerMap: make(map[string]*gxset.HashSet),
        }
        instanceMap[name] = newInstance
        return newInstance, nil
diff --git a/registry/nacos/service_discovery_test.go 
b/registry/nacos/service_discovery_test.go
index 03b1b9c..1c438c5 100644
--- a/registry/nacos/service_discovery_test.go
+++ b/registry/nacos/service_discovery_test.go
@@ -32,8 +32,6 @@ import (
 import (
        "dubbo.apache.org/dubbo-go/v3/common/constant"
        "dubbo.apache.org/dubbo-go/v3/common/extension"
-       "dubbo.apache.org/dubbo-go/v3/common/observer"
-       "dubbo.apache.org/dubbo-go/v3/common/observer/dispatcher"
        "dubbo.apache.org/dubbo-go/v3/config"
        "dubbo.apache.org/dubbo-go/v3/metadata/mapping"
        "dubbo.apache.org/dubbo-go/v3/registry"
@@ -75,15 +73,11 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) {
                return
        }
        prepareData()
-       extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
-               return dispatcher.NewMockEventDispatcher()
-       })
 
        extension.SetGlobalServiceNameMapping(func() mapping.ServiceNameMapping 
{
                return mapping.NewMockServiceNameMapping()
        })
 
-       extension.SetAndInitGlobalDispatcher("mock")
        rand.Seed(time.Now().Unix())
        serviceName := "service-name" + strconv.Itoa(rand.Intn(10000))
        id := "id"
@@ -153,8 +147,6 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) {
        assert.Equal(t, "b", v)
 
        // test dispatcher event
-       err = serviceDiscovery.DispatchEventByServiceName(serviceName)
-       assert.Nil(t, err)
        hs := gxset.NewSet()
        hs.Add(serviceName)
        // test AddListener
diff --git a/registry/service_discovery.go b/registry/service_discovery.go
index 70b7b76..c90ff11 100644
--- a/registry/service_discovery.go
+++ b/registry/service_discovery.go
@@ -75,13 +75,4 @@ type ServiceDiscovery interface {
        // AddListener adds a new ServiceInstancesChangedListenerImpl
        // see addServiceInstancesChangedListener in Java
        AddListener(listener ServiceInstancesChangedListener) error
-
-       // DispatchEventByServiceName dispatches the 
ServiceInstancesChangedEvent to service instance whose name is serviceName
-       DispatchEventByServiceName(serviceName string) error
-
-       // DispatchEventForInstances dispatches the 
ServiceInstancesChangedEvent to target instances
-       DispatchEventForInstances(serviceName string, instances 
[]ServiceInstance) error
-
-       // DispatchEvent dispatches the event
-       DispatchEvent(event *ServiceInstancesChangedEvent) error
 }
diff --git a/registry/servicediscovery/service_discovery_registry.go 
b/registry/servicediscovery/service_discovery_registry.go
index 1feb8f1..40a0bf3 100644
--- a/registry/servicediscovery/service_discovery_registry.go
+++ b/registry/servicediscovery/service_discovery_registry.go
@@ -134,7 +134,7 @@ func creatServiceDiscovery(url *common.URL) 
(registry.ServiceDiscovery, error) {
        if err != nil {
                return nil, perrors.WithMessage(err, "Create service discovery 
fialed")
        }
-       return 
event.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil
+       return originServiceDiscovery, nil
 }
 
 func parseServices(literalServices string) *gxset.HashSet {
@@ -235,20 +235,12 @@ func (s *serviceDiscoveryRegistry) Subscribe(url 
*common.URL, notify registry.No
        }
        s.serviceListeners[serviceNamesKey] = listener
        listener.AddListenerAndNotify(protocolServiceKey, notify)
-       s.registerServiceInstancesChangedListener(url, listener)
-       return nil
-}
 
-func (s *serviceDiscoveryRegistry) registerServiceInstancesChangedListener(url 
*common.URL, listener registry.ServiceInstancesChangedListener) {
-       // FIXME ServiceNames.String() is not good
-       listenerId := listener.GetServiceNames().String() + ":" + getUrlKey(url)
-       if !s.subscribedServices.Contains(listenerId) {
-               err := s.serviceDiscovery.AddListener(listener)
-               if err != nil {
-                       logger.Errorf("add listener[%s] catch error,url:%s 
err:%s", listenerId, url.String(), err.Error())
-               }
+       err = s.serviceDiscovery.AddListener(listener)
+       if err != nil {
+               logger.Errorf("add instance listener catch error,url:%s 
err:%s", url.String(), err.Error())
        }
-
+       return nil
 }
 
 func getUrlKey(url *common.URL) string {
@@ -352,5 +344,26 @@ func tryInitMetadataService(url *common.URL) {
        if err != nil {
                logger.Errorf("could not export the metadata service", err)
        }
-       
extension.GetGlobalDispatcher().Dispatch(event.NewServiceConfigExportedEvent(expt.(*configurable.MetadataServiceExporter).ServiceConfig))
+
+       // report interface-app mapping
+       err = 
publishMapping(expt.(*configurable.MetadataServiceExporter).ServiceConfig)
+       if err != nil {
+               logger.Errorf("Publish interface-application mapping failed", 
err)
+       }
+}
+
+// OnEvent only handle ServiceConfigExportedEvent
+func publishMapping(sc *config.ServiceConfig) error {
+       urls := sc.GetExportedUrls()
+
+       for _, u := range urls {
+               err := 
extension.GetGlobalServiceNameMapping().Map(u.GetParam(constant.INTERFACE_KEY, 
""),
+                       u.GetParam(constant.GROUP_KEY, ""),
+                       u.GetParam(constant.Version, ""),
+                       u.Protocol)
+               if err != nil {
+                       return perrors.WithMessage(err, "could not map the 
service: "+u.String())
+               }
+       }
+       return nil
 }
diff --git a/registry/servicediscovery/service_discovery_registry_test.go 
b/registry/servicediscovery/service_discovery_registry_test.go
index 8c92642..d8b8445 100644
--- a/registry/servicediscovery/service_discovery_registry_test.go
+++ b/registry/servicediscovery/service_discovery_registry_test.go
@@ -30,8 +30,6 @@ import (
 import (
        "dubbo.apache.org/dubbo-go/v3/common"
        "dubbo.apache.org/dubbo-go/v3/common/extension"
-       "dubbo.apache.org/dubbo-go/v3/common/observer"
-       "dubbo.apache.org/dubbo-go/v3/common/observer/dispatcher"
        "dubbo.apache.org/dubbo-go/v3/config"
        "dubbo.apache.org/dubbo-go/v3/metadata/mapping"
        "dubbo.apache.org/dubbo-go/v3/metadata/service"
@@ -59,11 +57,6 @@ func TestServiceDiscoveryRegistry_Register(t *testing.T) {
                return mapping.NewMockServiceNameMapping()
        })
 
-       extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
-               return dispatcher.NewMockEventDispatcher()
-       })
-       extension.SetAndInitGlobalDispatcher("mock")
-
        config.GetBaseConfig().ServiceDiscoveries["mock"] = 
&config.ServiceDiscoveryConfig{
                Protocol: "mock",
        }
diff --git a/registry/zookeeper/service_discovery.go 
b/registry/zookeeper/service_discovery.go
index 15cf134..b1909ae 100644
--- a/registry/zookeeper/service_discovery.go
+++ b/registry/zookeeper/service_discovery.go
@@ -64,13 +64,14 @@ type zookeeperServiceDiscovery struct {
        client *gxzookeeper.ZookeeperClient
        csd    *curator_discovery.ServiceDiscovery
        // listener    *zookeeper.ZkEventListener
-       url         *common.URL
-       wg          sync.WaitGroup
-       cltLock     sync.Mutex
-       listenLock  sync.Mutex
-       done        chan struct{}
-       rootPath    string
-       listenNames []string
+       url                 *common.URL
+       wg                  sync.WaitGroup
+       cltLock             sync.Mutex
+       listenLock          sync.Mutex
+       done                chan struct{}
+       rootPath            string
+       listenNames         []string
+       instanceListenerMap map[string]*gxset.HashSet
 }
 
 // newZookeeperServiceDiscovery the constructor of newZookeeperServiceDiscovery
@@ -105,8 +106,9 @@ func newZookeeperServiceDiscovery(name string) 
(registry.ServiceDiscovery, error
                common.WithParamsValue(constant.CONFIG_TIMEOUT_KEY, 
remoteConfig.TimeoutStr))
        url.Location = remoteConfig.Address
        zksd := &zookeeperServiceDiscovery{
-               url:      url,
-               rootPath: rootPath,
+               url:                 url,
+               rootPath:            rootPath,
+               instanceListenerMap: make(map[string]*gxset.HashSet),
        }
        err := zookeeper.ValidateZookeeperClient(zksd, url.Location)
        if err != nil {
@@ -272,6 +274,7 @@ func (zksd *zookeeperServiceDiscovery) 
GetRequestInstances(serviceNames []string
 func (zksd *zookeeperServiceDiscovery) AddListener(listener 
registry.ServiceInstancesChangedListener) error {
        zksd.listenLock.Lock()
        defer zksd.listenLock.Unlock()
+
        for _, t := range listener.GetServiceNames().Values() {
                serviceName, ok := t.(string)
                if !ok {
@@ -279,23 +282,24 @@ func (zksd *zookeeperServiceDiscovery) 
AddListener(listener registry.ServiceInst
                        continue
                }
                zksd.listenNames = append(zksd.listenNames, serviceName)
-               zksd.csd.ListenServiceEvent(serviceName, zksd)
+               listenerSet, found := zksd.instanceListenerMap[serviceName]
+               if !found {
+                       listenerSet = gxset.NewSet(listener)
+                       listenerSet.Add(listener)
+                       zksd.instanceListenerMap[serviceName] = listenerSet
+               } else {
+                       listenerSet.Add(listener)
+               }
        }
-       return nil
-}
 
-func (zksd *zookeeperServiceDiscovery) DispatchEventByServiceName(serviceName 
string) error {
-       return zksd.DispatchEventForInstances(serviceName, 
zksd.GetInstances(serviceName))
-}
-
-// DispatchEventForInstances dispatch ServiceInstancesChangedEvent
-func (zksd *zookeeperServiceDiscovery) DispatchEventForInstances(serviceName 
string, instances []registry.ServiceInstance) error {
-       return 
zksd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, 
instances))
-}
-
-// nolint
-func (zksd *zookeeperServiceDiscovery) DispatchEvent(event 
*registry.ServiceInstancesChangedEvent) error {
-       extension.GetGlobalDispatcher().Dispatch(event)
+       for _, t := range listener.GetServiceNames().Values() {
+               serviceName, ok := t.(string)
+               if !ok {
+                       logger.Errorf("service name error %s", t)
+                       continue
+               }
+               zksd.csd.ListenServiceEvent(serviceName, zksd)
+       }
        return nil
 }
 
@@ -306,7 +310,15 @@ func (zksd *zookeeperServiceDiscovery) 
DataChange(eventType remoting.Event) bool
        path = strings.TrimPrefix(path, constant.PATH_SEPARATOR)
        // get service name in zk path
        serviceName := strings.Split(path, constant.PATH_SEPARATOR)[0]
-       err := zksd.DispatchEventByServiceName(serviceName)
+
+       var err error
+       instances := zksd.GetInstances(serviceName)
+       for _, lis := range zksd.instanceListenerMap[serviceName].Values() {
+               var instanceListener registry.ServiceInstancesChangedListener
+               instanceListener = 
lis.(registry.ServiceInstancesChangedListener)
+               err = 
instanceListener.OnEvent(registry.NewServiceInstancesChangedEvent(serviceName, 
instances))
+       }
+
        if err != nil {
                logger.Errorf("[zkServiceDiscovery] 
DispatchEventByServiceName{%s} error = err{%v}", serviceName, err)
                return false

Reply via email to