This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch v1.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/v1.x by this push:
     new b124ab5  Issue: Refactor event bus (#976)
b124ab5 is described below

commit b124ab50eaf7bcecc01ffb1ce6da092685de9d0b
Author: little-cui <[email protected]>
AuthorDate: Thu May 13 09:52:34 2021 +0800

    Issue: Refactor event bus (#976)
    
    * SCB-2094 Refactor notification center (#741)
    
    * SCB-2094 Refactor notification center
    
    * SCB-2094 Add pkg description
    
    (cherry picked from commit 600b32303ad6fbda4cdebe29737bad8bb8978bdc)
    
    * SCB-2176 Pick master branch
    
    * SCB-2176 Refactor event bus
---
 pkg/{notify/processor.go => event/bus.go}          | 58 ++++++++++---------
 .../bus_service.go}                                | 67 +++++++++++-----------
 .../bus_service_test.go}                           | 12 ++--
 .../processor_test.go => event/bus_test.go}        | 12 ++--
 pkg/{notify => event}/common.go                    |  4 +-
 pkg/{notify/notice.go => event/event.go}           |  2 +-
 pkg/{notify/notice_test.go => event/event_test.go} |  6 +-
 pkg/{notify/subject.go => event/poster.go}         | 41 +++++++------
 .../subject_test.go => event/poster_test.go}       | 16 +++---
 pkg/{notify => event}/subscriber.go                | 28 ++++-----
 .../subscriber_checker.go}                         | 32 +++++------
 pkg/{notify/group.go => event/subscriber_group.go} | 38 ++++++------
 .../subscriber_group_test.go}                      | 25 ++++----
 pkg/{notify => event}/types.go                     |  6 +-
 pkg/{notify => event}/types_test.go                |  4 +-
 pkg/queue/taskqueue.go                             |  6 +-
 pkg/queue/taskqueue_test.go                        | 10 ++--
 pkg/notify/common.go => server/alarm/center.go     | 15 +++--
 server/alarm/common.go                             |  4 +-
 server/alarm/model/types.go                        |  2 +-
 server/alarm/service.go                            | 17 ++----
 .../common.go => server/connection/connection.go   | 12 ++--
 server/{notify => connection/grpc}/stream.go       | 56 +++++++++---------
 server/{notify => connection/grpc}/stream_test.go  | 14 +++--
 server/{notify => connection}/metrics.go           | 36 ++++++------
 server/{notify => connection/ws}/publisher.go      |  8 ++-
 server/{notify => connection/ws}/websocket.go      | 63 +++++++++++---------
 server/{notify => connection/ws}/websocket_test.go | 50 +++++++---------
 server/{notify => event}/center.go                 | 16 +++---
 .../instance_subscriber.go}                        | 29 ++++++----
 server/health/health_test.go                       |  4 +-
 server/notify/common.go                            | 29 ----------
 server/plugin/discovery/k8s/adaptor/listwatcher.go |  6 +-
 server/server.go                                   | 20 +++----
 server/service/event/instance_event_handler.go     | 10 ++--
 server/service/event/rule_event_handler.go         |  4 +-
 server/service/event/tag_event_handler.go          |  4 +-
 server/service/watch.go                            | 13 +++--
 38 files changed, 386 insertions(+), 393 deletions(-)

diff --git a/pkg/notify/processor.go b/pkg/event/bus.go
similarity index 54%
rename from pkg/notify/processor.go
rename to pkg/event/bus.go
index 5b24885..3735b0a 100644
--- a/pkg/notify/processor.go
+++ b/pkg/event/bus.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package notify
+package event
 
 import (
        "context"
@@ -23,74 +23,76 @@ import (
        "github.com/apache/servicecomb-service-center/pkg/util"
 )
 
-type Processor struct {
+// Bus can fire the event aync and dispatch events to subscriber according to 
subject
+type Bus struct {
        *queue.TaskQueue
 
        name     string
        subjects *util.ConcurrentMap
 }
 
-func (p *Processor) Name() string {
-       return p.name
+func (bus *Bus) Name() string {
+       return bus.name
 }
 
-func (p *Processor) Accept(job Event) {
-       p.Add(queue.Task{Object: job})
+func (bus *Bus) Fire(evt Event) {
+       // TODO add option if queue is full
+       bus.Add(queue.Task{Payload: evt})
 }
 
-func (p *Processor) Handle(ctx context.Context, obj interface{}) {
-       p.Notify(obj.(Event))
+func (bus *Bus) Handle(ctx context.Context, evt interface{}) {
+       bus.fireAtOnce(evt.(Event))
 }
 
-func (p *Processor) Notify(job Event) {
-       if itf, ok := p.subjects.Get(job.Subject()); ok {
-               itf.(*Subject).Notify(job)
+func (bus *Bus) fireAtOnce(evt Event) {
+       if itf, ok := bus.subjects.Get(evt.Subject()); ok {
+               itf.(*Poster).Post(evt)
        }
 }
 
-func (p *Processor) Subjects(name string) *Subject {
-       itf, ok := p.subjects.Get(name)
+func (bus *Bus) Subjects(name string) *Poster {
+       itf, ok := bus.subjects.Get(name)
        if !ok {
                return nil
        }
-       return itf.(*Subject)
+       return itf.(*Poster)
 }
 
-func (p *Processor) AddSubscriber(n Subscriber) {
-       item, _ := p.subjects.Fetch(n.Subject(), func() (interface{}, error) {
-               return NewSubject(n.Subject()), nil
+func (bus *Bus) AddSubscriber(n Subscriber) {
+       item, _ := bus.subjects.Fetch(n.Subject(), func() (interface{}, error) {
+               return NewPoster(n.Subject()), nil
        })
-       item.(*Subject).GetOrNewGroup(n.Group()).AddSubscriber(n)
+       item.(*Poster).GetOrNewGroup(n.Group()).AddMember(n)
 }
 
-func (p *Processor) Remove(n Subscriber) {
-       itf, ok := p.subjects.Get(n.Subject())
+func (bus *Bus) RemoveSubscriber(n Subscriber) {
+       itf, ok := bus.subjects.Get(n.Subject())
        if !ok {
                return
        }
 
-       s := itf.(*Subject)
+       s := itf.(*Poster)
        g := s.Groups(n.Group())
        if g == nil {
                return
        }
 
-       g.Remove(n.ID())
+       g.RemoveMember(n.ID())
 
        if g.Size() == 0 {
-               s.Remove(g.Name())
+               s.RemoveGroup(g.Name())
        }
        if s.Size() == 0 {
-               p.subjects.Remove(s.Name())
+               bus.subjects.Remove(s.Subject())
        }
 }
 
-func (p *Processor) Clear() {
-       p.subjects.Clear()
+func (bus *Bus) Clear() {
+       bus.subjects.Clear()
 }
 
-func NewProcessor(name string, queueSize int) *Processor {
-       p := &Processor{
+func NewBus(name string, queueSize int) *Bus {
+       p := &Bus{
                TaskQueue: queue.NewTaskQueue(queueSize),
                name:      name,
                subjects:  util.NewConcurrentMap(0),
diff --git a/pkg/notify/notification_service.go b/pkg/event/bus_service.go
similarity index 64%
rename from pkg/notify/notification_service.go
rename to pkg/event/bus_service.go
index 658d6ab..b5c8cd2 100644
--- a/pkg/notify/notification_service.go
+++ b/pkg/event/bus_service.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package notify
+package event
 
 import (
        "errors"
@@ -24,15 +24,18 @@ import (
        "sync"
 )
 
-type Service struct {
-       processors map[Type]*Processor
-       mux        sync.RWMutex
-       isClose    bool
+// BusService is the daemon service to manage multiple type Bus
+// and wrap handle methods of Bus
+type BusService struct {
+       // buses is the map of event handler, key is event source type
+       buses   map[Type]*Bus
+       mux     sync.RWMutex
+       isClose bool
 }
 
-func (s *Service) newProcessor(t Type) *Processor {
+func (s *BusService) newBus(t Type) *Bus {
        s.mux.RLock()
-       p, ok := s.processors[t]
+       p, ok := s.buses[t]
        if ok {
                s.mux.RUnlock()
                return p
@@ -40,20 +43,20 @@ func (s *Service) newProcessor(t Type) *Processor {
        s.mux.RUnlock()
 
        s.mux.Lock()
-       p, ok = s.processors[t]
+       p, ok = s.buses[t]
        if ok {
                s.mux.Unlock()
                return p
        }
-       p = NewProcessor(t.String(), t.QueueSize())
-       s.processors[t] = p
+       p = NewBus(t.String(), t.QueueSize())
+       s.buses[t] = p
        s.mux.Unlock()
 
        p.Run()
        return p
 }
 
-func (s *Service) Start() {
+func (s *BusService) Start() {
        if !s.Closed() {
                log.Warnf("notify service is already running")
                return
@@ -63,7 +66,7 @@ func (s *Service) Start() {
        s.mux.Unlock()
 
        // 错误subscriber清理
-       err := s.AddSubscriber(NewNotifyServiceHealthChecker())
+       err := s.AddSubscriber(NewSubscriberHealthChecker())
        if err != nil {
                log.Error("", err)
        }
@@ -71,7 +74,7 @@ func (s *Service) Start() {
        log.Debugf("notify service is started")
 }
 
-func (s *Service) AddSubscriber(n Subscriber) error {
+func (s *BusService) AddSubscriber(n Subscriber) error {
        if n == nil {
                err := errors.New("required Subscriber")
                log.Errorf(err, "add subscriber failed")
@@ -84,30 +87,30 @@ func (s *Service) AddSubscriber(n Subscriber) error {
                return err
        }
 
-       p := s.newProcessor(n.Type())
-       n.SetService(s)
+       p := s.newBus(n.Type())
+       n.SetBus(s)
        n.OnAccept()
 
        p.AddSubscriber(n)
        return nil
 }
 
-func (s *Service) RemoveSubscriber(n Subscriber) {
+func (s *BusService) RemoveSubscriber(n Subscriber) {
        s.mux.RLock()
-       p, ok := s.processors[n.Type()]
+       p, ok := s.buses[n.Type()]
        if !ok {
                s.mux.RUnlock()
                return
        }
        s.mux.RUnlock()
 
-       p.Remove(n)
+       p.RemoveSubscriber(n)
        n.Close()
 }
 
-func (s *Service) stopProcessors() {
+func (s *BusService) closeBuses() {
        s.mux.RLock()
-       for _, p := range s.processors {
+       for _, p := range s.buses {
                p.Clear()
                p.Stop()
        }
@@ -115,30 +118,30 @@ func (s *Service) stopProcessors() {
 }
 
 //通知内容塞到队列里
-func (s *Service) Publish(job Event) error {
+func (s *BusService) Fire(evt Event) error {
        if s.Closed() {
-               return errors.New("add notify job failed for server shutdown")
+               return errors.New("add notify evt failed for server shutdown")
        }
 
        s.mux.RLock()
-       p, ok := s.processors[job.Type()]
+       bus, ok := s.buses[evt.Type()]
        if !ok {
                s.mux.RUnlock()
-               return fmt.Errorf("unknown job type[%s]", job.Type())
+               return fmt.Errorf("unknown evt type[%s]", evt.Type())
        }
        s.mux.RUnlock()
-       p.Accept(job)
+       bus.Fire(evt)
        return nil
 }
 
-func (s *Service) Closed() (b bool) {
+func (s *BusService) Closed() (b bool) {
        s.mux.RLock()
        b = s.isClose
        s.mux.RUnlock()
        return
 }
 
-func (s *Service) Stop() {
+func (s *BusService) Stop() {
        if s.Closed() {
                return
        }
@@ -146,14 +149,14 @@ func (s *Service) Stop() {
        s.isClose = true
        s.mux.Unlock()
 
-       s.stopProcessors()
+       s.closeBuses()
 
        log.Debug("notify service stopped")
 }
 
-func NewNotifyService() *Service {
-       return &Service{
-               processors: make(map[Type]*Processor),
-               isClose:    true,
+func NewBusService() *BusService {
+       return &BusService{
+               buses:   make(map[Type]*Bus),
+               isClose: true,
        }
 }
diff --git a/pkg/notify/notification_test.go b/pkg/event/bus_service_test.go
similarity index 88%
rename from pkg/notify/notification_test.go
rename to pkg/event/bus_service_test.go
index f2af609..752101d 100644
--- a/pkg/notify/notification_test.go
+++ b/pkg/event/bus_service_test.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package notify
+package event
 
 import (
        simple "github.com/apache/servicecomb-service-center/pkg/time"
@@ -26,7 +26,7 @@ import (
 func TestGetNotifyService(t *testing.T) {
        INSTANCE := RegisterType("INSTANCE", 1)
 
-       notifyService := NewNotifyService()
+       notifyService := NewBusService()
        if notifyService == nil {
                t.Fatalf("TestGetNotifyService failed")
        }
@@ -38,7 +38,7 @@ func TestGetNotifyService(t *testing.T) {
        if err == nil {
                t.Fatalf("TestGetNotifyService failed")
        }
-       err = notifyService.Publish(nil)
+       err = notifyService.Fire(nil)
        if err == nil {
                t.Fatalf("TestGetNotifyService failed")
        }
@@ -62,15 +62,15 @@ func TestGetNotifyService(t *testing.T) {
                t.Fatalf("TestGetNotifyService failed, %v", err)
        }
        j := &baseEvent{INSTANCE, "s", "g", simple.FromTime(time.Now())}
-       err = notifyService.Publish(j)
+       err = notifyService.Fire(j)
        if err != nil {
                t.Fatalf("TestGetNotifyService failed")
        }
-       err = 
notifyService.Publish(NewNotifyServiceHealthCheckJob(NewNotifyServiceHealthChecker()))
+       err = 
notifyService.Fire(NewUnhealthyEvent(NewSubscriberHealthChecker()))
        if err != nil {
                t.Fatalf("TestGetNotifyService failed")
        }
-       err = notifyService.Publish(NewNotifyServiceHealthCheckJob(s))
+       err = notifyService.Fire(NewUnhealthyEvent(s))
        if err != nil {
                t.Fatalf("TestGetNotifyService failed")
        }
diff --git a/pkg/notify/processor_test.go b/pkg/event/bus_test.go
similarity index 91%
rename from pkg/notify/processor_test.go
rename to pkg/event/bus_test.go
index 894231b..a5e214b 100644
--- a/pkg/notify/processor_test.go
+++ b/pkg/event/bus_test.go
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notify
+package event
 
 import (
        "testing"
@@ -37,7 +37,7 @@ func TestProcessor_Do(t *testing.T) {
                job: make(chan Event, 1)}
        mock2 := &mockSubscriberChan{Subscriber: NewSubscriber(INSTANCE, "s1", 
"g2"),
                job: make(chan Event, 1)}
-       p := NewProcessor("p1", 0)
+       p := NewBus("p1", 0)
        if p.Name() != "p1" {
                t.Fatalf("TestProcessor_Do")
        }
@@ -45,12 +45,12 @@ func TestProcessor_Do(t *testing.T) {
                t.Fatalf("TestProcessor_Do")
        }
        p.AddSubscriber(mock1)
-       if 
p.Subjects(mock1.Subject()).Groups(mock1.Group()).Subscribers(mock1.ID()) != 
mock1 {
+       if p.Subjects(mock1.Subject()).Groups(mock1.Group()).Member(mock1.ID()) 
!= mock1 {
                t.Fatalf("TestProcessor_Do")
        }
-       p.Remove(NewSubscriber(INSTANCE, "s2", "g1"))
-       p.Remove(NewSubscriber(INSTANCE, "s1", "g2"))
-       p.Remove(mock1)
+       p.RemoveSubscriber(NewSubscriber(INSTANCE, "s2", "g1"))
+       p.RemoveSubscriber(NewSubscriber(INSTANCE, "s1", "g2"))
+       p.RemoveSubscriber(mock1)
        if p.Subjects(mock1.Subject()) != nil {
                t.Fatalf("TestProcessor_Do")
        }
diff --git a/pkg/notify/common.go b/pkg/event/common.go
similarity index 95%
copy from pkg/notify/common.go
copy to pkg/event/common.go
index c98e31c..3b2f115 100644
--- a/pkg/notify/common.go
+++ b/pkg/event/common.go
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package notify
+package event
 
 const (
        DefaultQueueSize = 1000
 )
 
 const (
-       NOTIFTY Type = iota
+       INNER Type = iota
 )
diff --git a/pkg/notify/notice.go b/pkg/event/event.go
similarity index 99%
rename from pkg/notify/notice.go
rename to pkg/event/event.go
index 8015d7d..eeac4ce 100644
--- a/pkg/notify/notice.go
+++ b/pkg/event/event.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package notify
+package event
 
 import (
        simple "github.com/apache/servicecomb-service-center/pkg/time"
diff --git a/pkg/notify/notice_test.go b/pkg/event/event_test.go
similarity index 89%
rename from pkg/notify/notice_test.go
rename to pkg/event/event_test.go
index 5ad78d2..ed181fb 100644
--- a/pkg/notify/notice_test.go
+++ b/pkg/event/event_test.go
@@ -13,7 +13,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package notify
+package event
 
 import (
        "fmt"
@@ -21,13 +21,13 @@ import (
 )
 
 func TestNewEventWithTime(t *testing.T) {
-       evt := NewEvent(NOTIFTY, "a", "b")
+       evt := NewEvent(INNER, "a", "b")
        if evt.CreateAt().UnixNano() == 0 {
                t.Fatal("TestNewEventWithTime")
        }
        fmt.Println(evt.CreateAt())
 
-       if evt.Type() != NOTIFTY || evt.Subject() != "a" || evt.Group() != "b" {
+       if evt.Type() != INNER || evt.Subject() != "a" || evt.Group() != "b" {
                t.Fatal("TestNewEventWithTime")
        }
 }
diff --git a/pkg/notify/subject.go b/pkg/event/poster.go
similarity index 66%
rename from pkg/notify/subject.go
rename to pkg/event/poster.go
index 4f79ba9..4cb3f49 100644
--- a/pkg/notify/subject.go
+++ b/pkg/event/poster.go
@@ -15,25 +15,32 @@
  * limitations under the License.
  */
 
-package notify
+package event
 
 import (
        "github.com/apache/servicecomb-service-center/pkg/util"
 )
 
-type Subject struct {
-       name   string
-       groups *util.ConcurrentMap
+// Poster post the events of specified subject to the subscribers
+type Poster struct {
+       subject string
+       groups  *util.ConcurrentMap
 }
 
-func (s *Subject) Name() string {
-       return s.name
+func (s *Poster) Subject() string {
+       return s.subject
 }
 
-func (s *Subject) Notify(job Event) {
+func (s *Poster) Post(job Event) {
+       f := func(g *Group) {
+               g.ForEach(func(m Subscriber) {
+                       m.OnMessage(job)
+               })
+       }
+
        if len(job.Group()) == 0 {
                s.groups.ForEach(func(item util.MapItem) (next bool) {
-                       item.Value.(*Group).Notify(job)
+                       f(item.Value.(*Group))
                        return true
                })
                return
@@ -43,10 +50,10 @@ func (s *Subject) Notify(job Event) {
        if !ok {
                return
        }
-       itf.(*Group).Notify(job)
+       f(itf.(*Group))
 }
 
-func (s *Subject) Groups(name string) *Group {
+func (s *Poster) Groups(name string) *Group {
        g, ok := s.groups.Get(name)
        if !ok {
                return nil
@@ -54,24 +61,24 @@ func (s *Subject) Groups(name string) *Group {
        return g.(*Group)
 }
 
-func (s *Subject) GetOrNewGroup(name string) *Group {
+func (s *Poster) GetOrNewGroup(name string) *Group {
        item, _ := s.groups.Fetch(name, func() (interface{}, error) {
                return NewGroup(name), nil
        })
        return item.(*Group)
 }
 
-func (s *Subject) Remove(name string) {
+func (s *Poster) RemoveGroup(name string) {
        s.groups.Remove(name)
 }
 
-func (s *Subject) Size() int {
+func (s *Poster) Size() int {
        return s.groups.Size()
 }
 
-func NewSubject(name string) *Subject {
-       return &Subject{
-               name:   name,
-               groups: util.NewConcurrentMap(0),
+func NewPoster(subject string) *Poster {
+       return &Poster{
+               subject: subject,
+               groups:  util.NewConcurrentMap(0),
        }
 }
diff --git a/pkg/notify/subject_test.go b/pkg/event/poster_test.go
similarity index 92%
rename from pkg/notify/subject_test.go
rename to pkg/event/poster_test.go
index c9e955c..931b5de 100644
--- a/pkg/notify/subject_test.go
+++ b/pkg/event/poster_test.go
@@ -14,13 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notify
+package event
 
 import "testing"
 
 func TestSubject_Fetch(t *testing.T) {
-       s := NewSubject("s1")
-       if s.Name() != "s1" {
+       s := NewPoster("s1")
+       if s.Subject() != "s1" {
                t.Fatalf("TestSubject_Fetch failed")
        }
        g := s.GetOrNewGroup("g1")
@@ -37,7 +37,7 @@ func TestSubject_Fetch(t *testing.T) {
        if s.Size() != 2 {
                t.Fatalf("TestSubject_Fetch failed")
        }
-       s.Remove(o.Name())
+       s.RemoveGroup(o.Name())
        if s.Groups("g2") != nil {
                t.Fatalf("TestSubject_Fetch failed")
        }
@@ -47,19 +47,19 @@ func TestSubject_Fetch(t *testing.T) {
        INSTANCE := RegisterType("INSTANCE", 1)
        mock1 := &mockSubscriber{Subscriber: NewSubscriber(INSTANCE, "s1", 
"g1")}
        mock2 := &mockSubscriber{Subscriber: NewSubscriber(INSTANCE, "s1", 
"g2")}
-       g.AddSubscriber(mock1)
+       g.AddMember(mock1)
        job := &baseEvent{group: "g3"}
-       s.Notify(job)
+       s.Post(job)
        if mock1.job != nil || mock2.job != nil {
                t.Fatalf("TestSubject_Fetch failed")
        }
        job.group = "g1"
-       s.Notify(job)
+       s.Post(job)
        if mock1.job != job || mock2.job != nil {
                t.Fatalf("TestSubject_Fetch failed")
        }
        job.group = ""
-       s.Notify(job)
+       s.Post(job)
        if mock1.job != job && mock2.job != job {
                t.Fatalf("TestSubject_Fetch failed")
        }
diff --git a/pkg/notify/subscriber.go b/pkg/event/subscriber.go
similarity index 67%
rename from pkg/notify/subscriber.go
rename to pkg/event/subscriber.go
index 85eabcd..1a3048b 100644
--- a/pkg/notify/subscriber.go
+++ b/pkg/event/subscriber.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package notify
+package event
 
 import (
        "errors"
@@ -27,8 +27,8 @@ type Subscriber interface {
        Subject() string
        Group() string
        Type() Type
-       Service() *Service
-       SetService(*Service)
+       Bus() *BusService
+       SetBus(*BusService)
 
        Err() error
        SetError(err error)
@@ -44,20 +44,20 @@ type baseSubscriber struct {
        id      string
        subject string
        group   string
-       service *Service
+       service *BusService
        err     error
 }
 
-func (s *baseSubscriber) ID() string              { return s.id }
-func (s *baseSubscriber) Subject() string         { return s.subject }
-func (s *baseSubscriber) Group() string           { return s.group }
-func (s *baseSubscriber) Type() Type              { return s.nType }
-func (s *baseSubscriber) Service() *Service       { return s.service }
-func (s *baseSubscriber) SetService(svc *Service) { s.service = svc }
-func (s *baseSubscriber) Err() error              { return s.err }
-func (s *baseSubscriber) SetError(err error)      { s.err = err }
-func (s *baseSubscriber) Close()                  {}
-func (s *baseSubscriber) OnAccept()               {}
+func (s *baseSubscriber) ID() string             { return s.id }
+func (s *baseSubscriber) Subject() string        { return s.subject }
+func (s *baseSubscriber) Group() string          { return s.group }
+func (s *baseSubscriber) Type() Type             { return s.nType }
+func (s *baseSubscriber) Bus() *BusService       { return s.service }
+func (s *baseSubscriber) SetBus(svc *BusService) { s.service = svc }
+func (s *baseSubscriber) Err() error             { return s.err }
+func (s *baseSubscriber) SetError(err error)     { s.err = err }
+func (s *baseSubscriber) Close()                 {}
+func (s *baseSubscriber) OnAccept()              {}
 func (s *baseSubscriber) OnMessage(job Event) {
        s.SetError(errors.New("do not call base notifier OnMessage method"))
 }
diff --git a/pkg/notify/notification_healthchecker.go 
b/pkg/event/subscriber_checker.go
similarity index 65%
rename from pkg/notify/notification_healthchecker.go
rename to pkg/event/subscriber_checker.go
index 2466d8c..28c4cb6 100644
--- a/pkg/notify/notification_healthchecker.go
+++ b/pkg/event/subscriber_checker.go
@@ -15,30 +15,30 @@
  * limitations under the License.
  */
 
-package notify
+package event
 
 import "github.com/apache/servicecomb-service-center/pkg/log"
 
 const (
-       ServerCheckerName  = "__HealthChecker__"
-       ServerCheckSubject = "__NotifyServerHealthCheck__"
+       CheckerGroup   = "__HealthChecker__"
+       CheckerSubject = "__NotifyServerHealthCheck__"
 )
 
-//Notifier 健康检查
-type ServiceHealthChecker struct {
+// SubscriberHealthChecker check subscriber health and remove it from bus if 
return Err
+type SubscriberHealthChecker struct {
        Subscriber
 }
 
-type ServiceHealthCheckJob struct {
+type UnhealthyEvent struct {
        Event
        ErrorSubscriber Subscriber
 }
 
-func (s *ServiceHealthChecker) OnMessage(job Event) {
-       j := job.(*ServiceHealthCheckJob)
+func (s *SubscriberHealthChecker) OnMessage(evt Event) {
+       j := evt.(*UnhealthyEvent)
        err := j.ErrorSubscriber.Err()
 
-       if j.ErrorSubscriber.Type() == NOTIFTY {
+       if j.ErrorSubscriber.Type() == INNER {
                log.Errorf(nil, "remove %s watcher failed, here cause a dead 
lock, subject: %s, group: %s",
                        j.ErrorSubscriber.Type(), j.ErrorSubscriber.Subject(), 
j.ErrorSubscriber.Group())
                return
@@ -46,18 +46,18 @@ func (s *ServiceHealthChecker) OnMessage(job Event) {
 
        log.Debugf("notification service remove %s watcher, error: %v, subject: 
%s, group: %s",
                j.ErrorSubscriber.Type(), err, j.ErrorSubscriber.Subject(), 
j.ErrorSubscriber.Group())
-       s.Service().RemoveSubscriber(j.ErrorSubscriber)
+       s.Bus().RemoveSubscriber(j.ErrorSubscriber)
 }
 
-func NewNotifyServiceHealthChecker() *ServiceHealthChecker {
-       return &ServiceHealthChecker{
-               Subscriber: NewSubscriber(NOTIFTY, ServerCheckSubject, 
ServerCheckerName),
+func NewSubscriberHealthChecker() *SubscriberHealthChecker {
+       return &SubscriberHealthChecker{
+               Subscriber: NewSubscriber(INNER, CheckerSubject, CheckerGroup),
        }
 }
 
-func NewNotifyServiceHealthCheckJob(s Subscriber) *ServiceHealthCheckJob {
-       return &ServiceHealthCheckJob{
-               Event:           NewEvent(NOTIFTY, ServerCheckSubject, 
ServerCheckerName),
+func NewUnhealthyEvent(s Subscriber) *UnhealthyEvent {
+       return &UnhealthyEvent{
+               Event:           NewEvent(INNER, CheckerSubject, CheckerGroup),
                ErrorSubscriber: s,
        }
 }
diff --git a/pkg/notify/group.go b/pkg/event/subscriber_group.go
similarity index 65%
rename from pkg/notify/group.go
rename to pkg/event/subscriber_group.go
index c600bba..0c41f4a 100644
--- a/pkg/notify/group.go
+++ b/pkg/event/subscriber_group.go
@@ -15,51 +15,51 @@
  * limitations under the License.
  */
 
-package notify
+package event
 
 import (
        "github.com/apache/servicecomb-service-center/pkg/util"
 )
 
 type Group struct {
-       name        string
-       subscribers *util.ConcurrentMap
+       name    string
+       members *util.ConcurrentMap
 }
 
 func (g *Group) Name() string {
        return g.name
 }
 
-func (g *Group) Notify(job Event) {
-       g.subscribers.ForEach(func(item util.MapItem) (next bool) {
-               item.Value.(Subscriber).OnMessage(job)
-               return true
-       })
-}
-
-func (g *Group) Subscribers(name string) Subscriber {
-       s, ok := g.subscribers.Get(name)
+func (g *Group) Member(name string) Subscriber {
+       s, ok := g.members.Get(name)
        if !ok {
                return nil
        }
        return s.(Subscriber)
 }
 
-func (g *Group) AddSubscriber(subscriber Subscriber) Subscriber {
-       return g.subscribers.PutIfAbsent(subscriber.ID(), 
subscriber).(Subscriber)
+func (g *Group) ForEach(iter func(m Subscriber)) {
+       g.members.ForEach(func(item util.MapItem) (next bool) {
+               iter(item.Value.(Subscriber))
+               return true
+       })
+}
+
+func (g *Group) AddMember(subscriber Subscriber) Subscriber {
+       return g.members.PutIfAbsent(subscriber.ID(), subscriber).(Subscriber)
 }
 
-func (g *Group) Remove(name string) {
-       g.subscribers.Remove(name)
+func (g *Group) RemoveMember(name string) {
+       g.members.Remove(name)
 }
 
 func (g *Group) Size() int {
-       return g.subscribers.Size()
+       return g.members.Size()
 }
 
 func NewGroup(name string) *Group {
        return &Group{
-               name:        name,
-               subscribers: util.NewConcurrentMap(0),
+               name:    name,
+               members: util.NewConcurrentMap(0),
        }
 }
diff --git a/pkg/notify/group_test.go b/pkg/event/subscriber_group_test.go
similarity index 79%
rename from pkg/notify/group_test.go
rename to pkg/event/subscriber_group_test.go
index 81040a7..b0dc22d 100644
--- a/pkg/notify/group_test.go
+++ b/pkg/event/subscriber_group_test.go
@@ -14,9 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notify
+package event
 
-import "testing"
+import (
+       "testing"
+)
 
 type mockSubscriber struct {
        Subscriber
@@ -34,37 +36,32 @@ func TestGroup_Add(t *testing.T) {
        if g.Name() != "g1" {
                t.Fatalf("TestGroup_Add failed")
        }
-       if g.AddSubscriber(m) != m {
+       if g.AddMember(m) != m {
                t.Fatalf("TestGroup_Add failed")
        }
-       if g.AddSubscriber(NewSubscriber(INSTANCE, "s1", "g1")) == m {
+       if g.AddMember(NewSubscriber(INSTANCE, "s1", "g1")) == m {
                t.Fatalf("TestGroup_Add failed")
        }
        same := *(m.(*baseSubscriber))
-       if g.AddSubscriber(&same) != m {
+       if g.AddMember(&same) != m {
                t.Fatalf("TestGroup_Add failed")
        }
        if g.Size() != 2 {
                t.Fatalf("TestGroup_Add failed")
        }
-       g.Remove(m.ID())
+       g.RemoveMember(m.ID())
        if g.Size() != 1 {
                t.Fatalf("TestGroup_Add failed")
        }
-       if g.Subscribers(m.ID()) == m {
+       if g.Member(m.ID()) == m {
                t.Fatalf("TestGroup_Add failed")
        }
 
        mock := &mockSubscriber{Subscriber: NewSubscriber(INSTANCE, "s1", "g1")}
-       if g.AddSubscriber(mock) != mock {
+       if g.AddMember(mock) != mock {
                t.Fatalf("TestGroup_Add failed")
        }
-       if g.Subscribers(mock.ID()) != mock {
-               t.Fatalf("TestGroup_Add failed")
-       }
-       job := &baseEvent{nType: INSTANCE}
-       g.Notify(job)
-       if mock.job != job {
+       if g.Member(mock.ID()) != mock {
                t.Fatalf("TestGroup_Add failed")
        }
 }
diff --git a/pkg/notify/types.go b/pkg/event/types.go
similarity index 96%
rename from pkg/notify/types.go
rename to pkg/event/types.go
index d6cba5d..d42377b 100644
--- a/pkg/notify/types.go
+++ b/pkg/event/types.go
@@ -13,7 +13,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package notify
+package event
 
 import "strconv"
 
@@ -41,11 +41,11 @@ func (nt Type) IsValid() bool {
 }
 
 var typeNames = []string{
-       NOTIFTY: "NOTIFTY",
+       INNER: "INNER",
 }
 
 var typeQueues = []int{
-       NOTIFTY: 0,
+       INNER: 0,
 }
 
 func Types() (ts []Type) {
diff --git a/pkg/notify/types_test.go b/pkg/event/types_test.go
similarity index 93%
rename from pkg/notify/types_test.go
rename to pkg/event/types_test.go
index 720af81..35e5a66 100644
--- a/pkg/notify/types_test.go
+++ b/pkg/event/types_test.go
@@ -13,7 +13,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package notify
+package event
 
 import "testing"
 
@@ -30,7 +30,7 @@ func TestRegisterType(t *testing.T) {
        if id.String() != "Type999" || id.QueueSize() != DefaultQueueSize {
                t.Fatal("TestRegisterType failed", id.String(), id.QueueSize())
        }
-       if NOTIFTY.String() != "NOTIFTY" || NOTIFTY.QueueSize() != 
DefaultQueueSize {
+       if INNER.String() != "INNER" || INNER.QueueSize() != DefaultQueueSize {
                t.Fatal("TestRegisterType failed", id.String(), id.QueueSize())
        }
 }
diff --git a/pkg/queue/taskqueue.go b/pkg/queue/taskqueue.go
index e59cec5..b394802 100644
--- a/pkg/queue/taskqueue.go
+++ b/pkg/queue/taskqueue.go
@@ -29,7 +29,7 @@ type Worker interface {
 }
 
 type Task struct {
-       Object interface{}
+       Payload interface{}
        // Async can let workers handle this task concurrently, but
        // it will make this task unordered
        Async bool
@@ -61,13 +61,13 @@ func (q *TaskQueue) Do(ctx context.Context, task Task) {
        if task.Async {
                for _, w := range q.Workers {
                        q.goroutine.Do(func(ctx context.Context) {
-                               q.dispatch(ctx, w, task.Object)
+                               q.dispatch(ctx, w, task.Payload)
                        })
                }
                return
        }
        for _, w := range q.Workers {
-               q.dispatch(ctx, w, task.Object)
+               q.dispatch(ctx, w, task.Payload)
        }
 }
 
diff --git a/pkg/queue/taskqueue_test.go b/pkg/queue/taskqueue_test.go
index 8c79da4..d0eecd6 100644
--- a/pkg/queue/taskqueue_test.go
+++ b/pkg/queue/taskqueue_test.go
@@ -34,26 +34,26 @@ func TestNewEventQueue(t *testing.T) {
        q := NewTaskQueue(0)
        q.AddWorker(h)
 
-       q.Do(context.Background(), Task{Object: 1})
+       q.Do(context.Background(), Task{Payload: 1})
        if <-h.Object != 1 {
                t.Fatalf("TestNewEventQueue failed")
        }
 
-       q.Do(context.Background(), Task{Object: 11, Async: true})
+       q.Do(context.Background(), Task{Payload: 11, Async: true})
        if <-h.Object != 11 {
                t.Fatalf("TestNewEventQueue failed")
        }
 
        q.Run()
-       q.Add(Task{Object: 2})
+       q.Add(Task{Payload: 2})
        if <-h.Object != 2 {
                t.Fatalf("TestNewEventQueue failed")
        }
 
-       q.Add(Task{Object: 22, Async: true})
+       q.Add(Task{Payload: 22, Async: true})
        if <-h.Object != 22 {
                t.Fatalf("TestNewEventQueue failed")
        }
        q.Stop()
-       q.Add(Task{Object: 3})
+       q.Add(Task{Payload: 3})
 }
diff --git a/pkg/notify/common.go b/server/alarm/center.go
similarity index 88%
copy from pkg/notify/common.go
copy to server/alarm/center.go
index c98e31c..9af6aa8 100644
--- a/pkg/notify/common.go
+++ b/server/alarm/center.go
@@ -15,12 +15,11 @@
  * limitations under the License.
  */
 
-package notify
+package alarm
 
-const (
-       DefaultQueueSize = 1000
-)
-
-const (
-       NOTIFTY Type = iota
-)
+func Center() *Service {
+       once.Do(func() {
+               service = NewAlarmService()
+       })
+       return service
+}
diff --git a/server/alarm/common.go b/server/alarm/common.go
index 9243d13..49f8fd0 100644
--- a/server/alarm/common.go
+++ b/server/alarm/common.go
@@ -17,7 +17,7 @@ package alarm
 
 import (
        "fmt"
-       "github.com/apache/servicecomb-service-center/pkg/notify"
+       "github.com/apache/servicecomb-service-center/pkg/event"
        "github.com/apache/servicecomb-service-center/server/alarm/model"
 )
 
@@ -40,7 +40,7 @@ const (
        Group   = "__ALARM_GROUP__"
 )
 
-var ALARM = notify.RegisterType("ALARM", 0)
+var ALARM = event.RegisterType("ALARM", 0)
 
 func FieldBool(key string, v bool) model.Field {
        return model.Field{Key: key, Value: v}
diff --git a/server/alarm/model/types.go b/server/alarm/model/types.go
index 0c29d79..40d0a44 100644
--- a/server/alarm/model/types.go
+++ b/server/alarm/model/types.go
@@ -16,7 +16,7 @@
 package model
 
 import (
-       nf "github.com/apache/servicecomb-service-center/pkg/notify"
+       nf "github.com/apache/servicecomb-service-center/pkg/event"
        "github.com/apache/servicecomb-service-center/pkg/util"
 )
 
diff --git a/server/alarm/service.go b/server/alarm/service.go
index e399b91..3ef5e07 100644
--- a/server/alarm/service.go
+++ b/server/alarm/service.go
@@ -16,11 +16,11 @@
 package alarm
 
 import (
+       nf "github.com/apache/servicecomb-service-center/pkg/event"
        "github.com/apache/servicecomb-service-center/pkg/log"
-       nf "github.com/apache/servicecomb-service-center/pkg/notify"
        "github.com/apache/servicecomb-service-center/pkg/util"
        "github.com/apache/servicecomb-service-center/server/alarm/model"
-       "github.com/apache/servicecomb-service-center/server/notify"
+       "github.com/apache/servicecomb-service-center/server/event"
        "sync"
 )
 
@@ -44,7 +44,7 @@ func (ac *Service) Raise(id model.ID, fields ...model.Field) 
error {
        for _, f := range fields {
                ae.Fields[f.Key] = f.Value
        }
-       return notify.GetNotifyCenter().Publish(ae)
+       return event.Center().Fire(ae)
 }
 
 func (ac *Service) Clear(id model.ID) error {
@@ -53,7 +53,7 @@ func (ac *Service) Clear(id model.ID) error {
                Status: Cleared,
                ID:     id,
        }
-       return notify.GetNotifyCenter().Publish(ae)
+       return event.Center().Fire(ae)
 }
 
 func (ac *Service) ListAll() (ls []*model.AlarmEvent) {
@@ -88,16 +88,9 @@ func NewAlarmService() *Service {
        c := &Service{
                Subscriber: nf.NewSubscriber(ALARM, Subject, Group),
        }
-       err := notify.GetNotifyCenter().AddSubscriber(c)
+       err := event.Center().AddSubscriber(c)
        if err != nil {
                log.Error("", err)
        }
        return c
 }
-
-func Center() *Service {
-       once.Do(func() {
-               service = NewAlarmService()
-       })
-       return service
-}
diff --git a/pkg/notify/common.go b/server/connection/connection.go
similarity index 75%
rename from pkg/notify/common.go
rename to server/connection/connection.go
index c98e31c..c34005f 100644
--- a/pkg/notify/common.go
+++ b/server/connection/connection.go
@@ -15,12 +15,14 @@
  * limitations under the License.
  */
 
-package notify
+// connection pkg impl the pub/sub mechanism of the long connection of diff 
protocols
+package connection
 
-const (
-       DefaultQueueSize = 1000
-)
+import "time"
 
 const (
-       NOTIFTY Type = iota
+       HeartbeatInterval = 30 * time.Second
+       ReadTimeout       = HeartbeatInterval * 4
+       SendTimeout       = 5 * time.Second
+       ReadMaxBody       = 64
 )
diff --git a/server/notify/stream.go b/server/connection/grpc/stream.go
similarity index 56%
rename from server/notify/stream.go
rename to server/connection/grpc/stream.go
index f382d19..23c56a3 100644
--- a/server/notify/stream.go
+++ b/server/connection/grpc/stream.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package notify
+package grpc
 
 import (
        "context"
@@ -23,22 +23,23 @@ import (
        "github.com/apache/servicecomb-service-center/pkg/log"
        pb "github.com/apache/servicecomb-service-center/pkg/registry"
        "github.com/apache/servicecomb-service-center/pkg/util"
-       apt "github.com/apache/servicecomb-service-center/server/core"
+       "github.com/apache/servicecomb-service-center/server/connection"
        "github.com/apache/servicecomb-service-center/server/core/proto"
+       "github.com/apache/servicecomb-service-center/server/event"
        "time"
 )
 
-func HandleWatchJob(watcher *InstanceEventListWatcher, stream 
proto.ServiceInstanceCtrl_WatchServer) (err error) {
-       timer := time.NewTimer(HeartbeatInterval)
+const GRPC = "gRPC"
+
+func Handle(watcher *event.InstanceEventListWatcher, stream 
proto.ServiceInstanceCtrl_WatchServer) (err error) {
+       timer := time.NewTimer(connection.HeartbeatInterval)
        defer timer.Stop()
        for {
                select {
                case <-stream.Context().Done():
                        return
                case <-timer.C:
-                       timer.Reset(HeartbeatInterval)
-
-                       // TODO grpc 长连接心跳?
+                       timer.Reset(connection.HeartbeatInterval)
                case job := <-watcher.Job:
                        if job == nil {
                                err = errors.New("channel is closed")
@@ -46,37 +47,36 @@ func HandleWatchJob(watcher *InstanceEventListWatcher, 
stream proto.ServiceInsta
                                        watcher.Subject(), watcher.Group())
                                return
                        }
-                       if job.Response != nil {
-                               resp := job.Response
-                               log.Infof("event is coming in, watcher, 
subject: %s, group: %s",
-                                       watcher.Subject(), watcher.Group())
+                       if job.Response == nil {
+                               continue
+                       }
+                       resp := job.Response
+                       log.Infof("event is coming in, watcher, subject: %s, 
group: %s",
+                               watcher.Subject(), watcher.Group())
 
-                               err = stream.Send(resp)
-                               if job != nil {
-                                       ReportPublishCompleted(job, err)
-                               }
-                               if err != nil {
-                                       log.Errorf(err, "send message error, 
subject: %s, group: %s",
-                                               watcher.Subject(), 
watcher.Group())
-                                       watcher.SetError(err)
-                                       return
-                               }
-                               util.ResetTimer(timer, HeartbeatInterval)
+                       err = stream.Send(resp)
+                       connection.ReportPublishCompleted(job, err)
+                       if err != nil {
+                               log.Errorf(err, "send message error, subject: 
%s, group: %s",
+                                       watcher.Subject(), watcher.Group())
+                               watcher.SetError(err)
+                               return
                        }
+                       util.ResetTimer(timer, connection.HeartbeatInterval)
                }
        }
 }
 
-func DoStreamListAndWatch(ctx context.Context, serviceID string, f func() 
([]*pb.WatchInstanceResponse, int64), stream 
proto.ServiceInstanceCtrl_WatchServer) (err error) {
+func ListAndWatch(ctx context.Context, serviceID string, f func() 
([]*pb.WatchInstanceResponse, int64), stream 
proto.ServiceInstanceCtrl_WatchServer) (err error) {
        domainProject := util.ParseDomainProject(ctx)
        domain := util.ParseDomain(ctx)
-       watcher := NewInstanceEventListWatcher(serviceID, 
apt.GetInstanceRootKey(domainProject)+"/", f)
-       err = GetNotifyCenter().AddSubscriber(watcher)
+       watcher := event.NewInstanceEventListWatcher(serviceID, domainProject, 
f)
+       err = event.Center().AddSubscriber(watcher)
        if err != nil {
                return
        }
-       ReportSubscriber(domain, GRPC, 1)
-       err = HandleWatchJob(watcher, stream)
-       ReportSubscriber(domain, GRPC, -1)
+       connection.ReportSubscriber(domain, GRPC, 1)
+       err = Handle(watcher, stream)
+       connection.ReportSubscriber(domain, GRPC, -1)
        return
 }
diff --git a/server/notify/stream_test.go 
b/server/connection/grpc/stream_test.go
similarity index 77%
rename from server/notify/stream_test.go
rename to server/connection/grpc/stream_test.go
index 6d92cb5..b20fa5d 100644
--- a/server/notify/stream_test.go
+++ b/server/connection/grpc/stream_test.go
@@ -14,13 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notify
+package grpc_test
 
 import (
        "context"
        "github.com/apache/servicecomb-service-center/pkg/log"
        pb "github.com/apache/servicecomb-service-center/pkg/registry"
        simple "github.com/apache/servicecomb-service-center/pkg/time"
+       stream 
"github.com/apache/servicecomb-service-center/server/connection/grpc"
+       "github.com/apache/servicecomb-service-center/server/event"
        "google.golang.org/grpc"
        "testing"
        "time"
@@ -39,19 +41,19 @@ func (x *grpcWatchServer) Context() context.Context {
 }
 
 func TestHandleWatchJob(t *testing.T) {
-       w := NewInstanceEventListWatcher("g", "s", nil)
+       w := event.NewInstanceEventListWatcher("g", "s", nil)
        w.Job <- nil
-       err := HandleWatchJob(w, &grpcWatchServer{})
+       err := stream.Handle(w, &grpcWatchServer{})
        if err == nil {
                t.Fatalf("TestHandleWatchJob failed")
        }
-       w.Job <- NewInstanceEventWithTime("g", "s", 1, 
simple.FromTime(time.Now()), nil)
+       w.Job <- event.NewInstanceEventWithTime("g", "s", 1, 
simple.FromTime(time.Now()), nil)
        w.Job <- nil
-       HandleWatchJob(w, &grpcWatchServer{})
+       stream.Handle(w, &grpcWatchServer{})
 }
 
 func TestDoStreamListAndWatch(t *testing.T) {
        defer log.Recover()
-       err := DoStreamListAndWatch(context.Background(), "s", nil, nil)
+       err := stream.ListAndWatch(context.Background(), "s", nil, nil)
        t.Fatal("TestDoStreamListAndWatch failed", err)
 }
diff --git a/server/notify/metrics.go b/server/connection/metrics.go
similarity index 65%
rename from server/notify/metrics.go
rename to server/connection/metrics.go
index 6bde05b..76aa41e 100644
--- a/server/notify/metrics.go
+++ b/server/connection/metrics.go
@@ -1,22 +1,24 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-package notify
+package connection
 
 import (
-       "github.com/apache/servicecomb-service-center/pkg/notify"
+       "github.com/apache/servicecomb-service-center/pkg/event"
        "github.com/apache/servicecomb-service-center/server/metric"
        "github.com/prometheus/client_golang/prometheus"
        "time"
@@ -58,7 +60,7 @@ func init() {
        prometheus.MustRegister(notifyCounter, notifyLatency, subscriberGauge)
 }
 
-func ReportPublishCompleted(evt notify.Event, err error) {
+func ReportPublishCompleted(evt event.Event, err error) {
        instance := metric.InstanceName()
        elapsed := float64(time.Since(evt.CreateAt()).Nanoseconds()) / 
float64(time.Microsecond)
        status := success
diff --git a/server/notify/publisher.go b/server/connection/ws/publisher.go
similarity index 96%
rename from server/notify/publisher.go
rename to server/connection/ws/publisher.go
index aa1c662..a8ee692 100644
--- a/server/notify/publisher.go
+++ b/server/connection/ws/publisher.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package notify
+package ws
 
 import (
        "context"
@@ -47,7 +47,7 @@ func (wh *Publisher) Stop() {
 
 func (wh *Publisher) dispatch(ws *WebSocket, payload interface{}) {
        wh.goroutine.Do(func(ctx context.Context) {
-               ws.HandleWatchWebSocketJob(payload)
+               ws.HandleEvent(payload)
        })
 }
 
@@ -102,3 +102,7 @@ func NewPublisher() *Publisher {
                goroutine: gopool.New(context.Background()),
        }
 }
+
+func Instance() *Publisher {
+       return publisher
+}
diff --git a/server/notify/websocket.go b/server/connection/ws/websocket.go
similarity index 84%
rename from server/notify/websocket.go
rename to server/connection/ws/websocket.go
index d31eb8d..d1d01a9 100644
--- a/server/notify/websocket.go
+++ b/server/connection/ws/websocket.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package notify
+package ws
 
 import (
        "context"
@@ -24,24 +24,28 @@ import (
        "github.com/apache/servicecomb-service-center/pkg/log"
        pb "github.com/apache/servicecomb-service-center/pkg/registry"
        "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/apache/servicecomb-service-center/server/connection"
+       "github.com/apache/servicecomb-service-center/server/event"
        serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
        "github.com/gorilla/websocket"
        "time"
 )
 
+const Websocket = "Websocket"
+
 type WebSocket struct {
        ctx    context.Context
        ticker *time.Ticker
        conn   *websocket.Conn
        // watcher subscribe the notification service event
-       watcher         *InstanceEventListWatcher
+       watcher         *event.InstanceEventListWatcher
        needPingWatcher bool
        free            chan struct{}
        closed          chan struct{}
 }
 
 func (wh *WebSocket) Init() error {
-       wh.ticker = time.NewTicker(HeartbeatInterval)
+       wh.ticker = time.NewTicker(connection.HeartbeatInterval)
        wh.needPingWatcher = true
        wh.free = make(chan struct{}, 1)
        wh.closed = make(chan struct{})
@@ -51,7 +55,7 @@ func (wh *WebSocket) Init() error {
        remoteAddr := wh.conn.RemoteAddr().String()
 
        // put in notification service queue
-       if err := GetNotifyCenter().AddSubscriber(wh.watcher); err != nil {
+       if err := event.Center().AddSubscriber(wh.watcher); err != nil {
                err = fmt.Errorf("establish[%s] websocket watch failed: notify 
service error, %s",
                        remoteAddr, err.Error())
                log.Errorf(nil, err.Error())
@@ -64,7 +68,7 @@ func (wh *WebSocket) Init() error {
        }
 
        // put in publisher queue
-       publisher.Accept(wh)
+       Instance().Accept(wh)
 
        log.Debugf("start watching instance status, watcher[%s], subject: %s, 
group: %s",
                remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
@@ -72,14 +76,14 @@ func (wh *WebSocket) Init() error {
 }
 
 func (wh *WebSocket) ReadTimeout() time.Duration {
-       return ReadTimeout
+       return connection.ReadTimeout
 }
 
 func (wh *WebSocket) SendTimeout() time.Duration {
-       return SendTimeout
+       return connection.SendTimeout
 }
 
-func (wh *WebSocket) heartbeat(messageType int) error {
+func (wh *WebSocket) Heartbeat(messageType int) error {
        err := wh.conn.WriteControl(messageType, []byte{}, 
time.Now().Add(wh.SendTimeout()))
        if err != nil {
                messageTypeName := "Ping"
@@ -94,7 +98,7 @@ func (wh *WebSocket) heartbeat(messageType int) error {
        return nil
 }
 
-func (wh *WebSocket) HandleWatchWebSocketControlMessage() {
+func (wh *WebSocket) HandleControlMessage() {
        remoteAddr := wh.conn.RemoteAddr().String()
        // PING
        wh.conn.SetPingHandler(func(message string) error {
@@ -109,7 +113,7 @@ func (wh *WebSocket) HandleWatchWebSocketControlMessage() {
                                message, remoteAddr, wh.watcher.Subject(), 
wh.watcher.Group())
                }
                wh.needPingWatcher = false
-               return wh.heartbeat(websocket.PongMessage)
+               return wh.Heartbeat(websocket.PongMessage)
        })
        // PONG
        wh.conn.SetPongHandler(func(message string) error {
@@ -130,7 +134,7 @@ func (wh *WebSocket) HandleWatchWebSocketControlMessage() {
                return wh.sendClose(code, text)
        })
 
-       wh.conn.SetReadLimit(ReadMaxBody)
+       wh.conn.SetReadLimit(connection.ReadMaxBody)
        err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
        if err != nil {
                log.Error("", err)
@@ -186,12 +190,11 @@ func (wh *WebSocket) Pick() interface{} {
        return nil
 }
 
-// HandleWatchWebSocketJob will be called if Pick() returns not nil
-func (wh *WebSocket) HandleWatchWebSocketJob(o interface{}) {
+// HandleEvent will be called if Pick() returns not nil
+func (wh *WebSocket) HandleEvent(o interface{}) {
        defer wh.SetReady()
 
        var (
-               job        *InstanceEvent
                message    []byte
                remoteAddr = wh.conn.RemoteAddr().String()
        )
@@ -213,7 +216,7 @@ func (wh *WebSocket) HandleWatchWebSocketJob(o interface{}) 
{
                        return
                }
 
-               if err := wh.heartbeat(websocket.PingMessage); err != nil {
+               if err := wh.Heartbeat(websocket.PingMessage); err != nil {
                        log.Errorf(err, "send 'Ping' message to watcher[%s] 
failed, subject: %s, group: %s",
                                remoteAddr, wh.watcher.Subject(), 
wh.watcher.Group())
                        return
@@ -222,7 +225,7 @@ func (wh *WebSocket) HandleWatchWebSocketJob(o interface{}) 
{
                log.Debugf("send 'Ping' message to watcher[%s], subject: %s, 
group: %s",
                        remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
                return
-       case *InstanceEvent:
+       case *event.InstanceEvent:
                resp := o.Response
 
                providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, 
resp.Key.ServiceName, resp.Key.Version)
@@ -254,8 +257,8 @@ func (wh *WebSocket) HandleWatchWebSocketJob(o interface{}) 
{
        }
 
        err := wh.WriteMessage(message)
-       if job != nil {
-               ReportPublishCompleted(job, err)
+       if evt, ok := o.(*event.InstanceEvent); ok {
+               connection.ReportPublishCompleted(evt, err)
        }
        if err != nil {
                log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: 
%s",
@@ -286,18 +289,14 @@ func (wh *WebSocket) Stop() {
        close(wh.closed)
 }
 
-func DoWebSocketListAndWatch(ctx context.Context, serviceID string, f func() 
([]*pb.WatchInstanceResponse, int64), conn *websocket.Conn) {
+func ListAndWatch(ctx context.Context, serviceID string, f func() 
([]*pb.WatchInstanceResponse, int64), conn *websocket.Conn) {
        domainProject := util.ParseDomainProject(ctx)
        domain := util.ParseDomain(ctx)
-       socket := &WebSocket{
-               ctx:     ctx,
-               conn:    conn,
-               watcher: NewInstanceEventListWatcher(serviceID, domainProject, 
f),
-       }
+       socket := New(ctx, conn, event.NewInstanceEventListWatcher(serviceID, 
domainProject, f))
 
-       ReportSubscriber(domain, Websocket, 1)
+       connection.ReportSubscriber(domain, Websocket, 1)
        process(socket)
-       ReportSubscriber(domain, Websocket, -1)
+       connection.ReportSubscriber(domain, Websocket, -1)
 }
 
 func process(socket *WebSocket) {
@@ -305,15 +304,23 @@ func process(socket *WebSocket) {
                return
        }
 
-       socket.HandleWatchWebSocketControlMessage()
+       socket.HandleControlMessage()
 
        socket.Stop()
 }
 
-func EstablishWebSocketError(conn *websocket.Conn, err error) {
+func SendEstablishError(conn *websocket.Conn, err error) {
        remoteAddr := conn.RemoteAddr().String()
        log.Errorf(err, "establish[%s] websocket watch failed.", remoteAddr)
        if err := conn.WriteMessage(websocket.TextMessage, 
util.StringToBytesWithNoCopy(err.Error())); err != nil {
                log.Errorf(err, "establish[%s] websocket watch failed: write 
message failed.", remoteAddr)
        }
 }
+
+func New(ctx context.Context, conn *websocket.Conn, watcher 
*event.InstanceEventListWatcher) *WebSocket {
+       return &WebSocket{
+               ctx:     ctx,
+               conn:    conn,
+               watcher: watcher,
+       }
+}
diff --git a/server/notify/websocket_test.go 
b/server/connection/ws/websocket_test.go
similarity index 71%
rename from server/notify/websocket_test.go
rename to server/connection/ws/websocket_test.go
index d9059f9..b7895eb 100644
--- a/server/notify/websocket_test.go
+++ b/server/connection/ws/websocket_test.go
@@ -14,16 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notify
+package ws_test
 
+// initialize
+import _ "github.com/apache/servicecomb-service-center/test"
 import (
        "context"
        "errors"
        "github.com/apache/servicecomb-service-center/pkg/registry"
+       wss "github.com/apache/servicecomb-service-center/server/connection/ws"
        "github.com/apache/servicecomb-service-center/server/core"
        "github.com/apache/servicecomb-service-center/server/core/proto"
-       _ 
"github.com/apache/servicecomb-service-center/server/plugin/discovery/etcd"
-       _ 
"github.com/apache/servicecomb-service-center/server/plugin/registry/buildin"
+       "github.com/apache/servicecomb-service-center/server/event"
        "github.com/gorilla/websocket"
        "net/http"
        "net/http/httptest"
@@ -64,9 +66,9 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
        conn, _, _ := websocket.DefaultDialer.Dial(
                strings.Replace(s.URL, "http://";, "ws://", 1), nil)
 
-       EstablishWebSocketError(conn, errors.New("error"))
+       wss.SendEstablishError(conn, errors.New("error"))
 
-       w := NewInstanceEventListWatcher("g", "s", func() (results 
[]*registry.WatchInstanceResponse, rev int64) {
+       w := event.NewInstanceEventListWatcher("g", "s", func() (results 
[]*registry.WatchInstanceResponse, rev int64) {
                results = append(results, &registry.WatchInstanceResponse{
                        Response: proto.CreateResponse(proto.Response_SUCCESS, 
"ok"),
                        Action:   string(registry.EVT_CREATE),
@@ -76,41 +78,33 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
                return
        })
 
-       ws := &WebSocket{
-               ctx:     context.Background(),
-               conn:    conn,
-               watcher: w,
-       }
+       ws := wss.New(context.Background(), conn, w)
        err := ws.Init()
        if err != nil {
                t.Fatalf("TestPublisher_Run")
        }
 
-       GetNotifyCenter().Start()
+       event.Center().Start()
 
        go func() {
-               DoWebSocketListAndWatch(context.Background(), "", nil, conn)
+               wss.ListAndWatch(context.Background(), "", nil, conn)
 
-               w2 := NewInstanceEventListWatcher("g", "s", func() (results 
[]*registry.WatchInstanceResponse, rev int64) {
+               w2 := event.NewInstanceEventListWatcher("g", "s", func() 
(results []*registry.WatchInstanceResponse, rev int64) {
                        return
                })
-               ws2 := &WebSocket{
-                       ctx:     context.Background(),
-                       conn:    conn,
-                       watcher: w2,
-               }
+               ws2 := wss.New(context.Background(), conn, w2)
                err := ws2.Init()
                if err != nil {
                        t.Fatalf("TestPublisher_Run")
                }
        }()
 
-       go ws.HandleWatchWebSocketControlMessage()
+       go ws.HandleControlMessage()
 
        w.OnMessage(nil)
-       w.OnMessage(&InstanceEvent{})
+       w.OnMessage(&event.InstanceEvent{})
 
-       GetNotifyCenter().Publish(NewInstanceEvent("g", "s", 1, 
&registry.WatchInstanceResponse{
+       event.Center().Fire(event.NewInstanceEvent("g", "s", 1, 
&registry.WatchInstanceResponse{
                Response: proto.CreateResponse(proto.Response_SUCCESS, "ok"),
                Action:   string(registry.EVT_CREATE),
                Key:      &registry.MicroServiceKey{},
@@ -119,21 +113,21 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
 
        <-time.After(time.Second)
 
-       ws.HandleWatchWebSocketJob(nil)
+       ws.HandleEvent(nil)
 
-       ws.heartbeat(websocket.PingMessage)
-       ws.heartbeat(websocket.PongMessage)
+       ws.Heartbeat(websocket.PingMessage)
+       ws.Heartbeat(websocket.PongMessage)
 
-       ws.HandleWatchWebSocketJob(time.Now())
+       ws.HandleEvent(time.Now())
 
        closeCh <- struct{}{}
 
        <-time.After(time.Second)
 
-       ws.heartbeat(websocket.PingMessage)
-       ws.heartbeat(websocket.PongMessage)
+       ws.Heartbeat(websocket.PingMessage)
+       ws.Heartbeat(websocket.PongMessage)
 
        w.OnMessage(nil)
 
-       publisher.Stop()
+       wss.Instance().Stop()
 }
diff --git a/server/notify/center.go b/server/event/center.go
similarity index 69%
rename from server/notify/center.go
rename to server/event/center.go
index 4a0ad80..732a762 100644
--- a/server/notify/center.go
+++ b/server/event/center.go
@@ -13,19 +13,21 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package notify
+package event
 
 import (
-       "github.com/apache/servicecomb-service-center/pkg/notify"
+       "github.com/apache/servicecomb-service-center/pkg/event"
 )
 
-var INSTANCE = notify.RegisterType("INSTANCE", InstanceEventQueueSize)
-var notifyService *notify.Service
+var busService *event.BusService
 
 func init() {
-       notifyService = notify.NewNotifyService()
+       busService = event.NewBusService()
 }
 
-func GetNotifyCenter() *notify.Service {
-       return notifyService
+// Center handle diff types of events
+// event type can be 'ALARM'(biz alarms), 'RESOURCE'(resource changes, like 
INSTANCE) or
+// inner type 'NOTIFY'(subscriber health check)
+func Center() *event.BusService {
+       return busService
 }
diff --git a/server/notify/listwatcher.go b/server/event/instance_subscriber.go
similarity index 83%
rename from server/notify/listwatcher.go
rename to server/event/instance_subscriber.go
index 6de175f..e13d0e9 100644
--- a/server/notify/listwatcher.go
+++ b/server/event/instance_subscriber.go
@@ -15,27 +15,34 @@
  * limitations under the License.
  */
 
-package notify
+package event
 
 import (
        "context"
+       "github.com/apache/servicecomb-service-center/pkg/event"
        "github.com/apache/servicecomb-service-center/pkg/gopool"
        "github.com/apache/servicecomb-service-center/pkg/log"
-       "github.com/apache/servicecomb-service-center/pkg/notify"
        pb "github.com/apache/servicecomb-service-center/pkg/registry"
        simple "github.com/apache/servicecomb-service-center/pkg/time"
        "time"
 )
 
+const (
+       AddJobTimeout  = 1 * time.Second
+       EventQueueSize = 5000
+)
+
+var INSTANCE = event.RegisterType("INSTANCE", EventQueueSize)
+
 // 状态变化推送
 type InstanceEvent struct {
-       notify.Event
+       event.Event
        Revision int64
        Response *pb.WatchInstanceResponse
 }
 
 type InstanceEventListWatcher struct {
-       notify.Subscriber
+       event.Subscriber
        Job          chan *InstanceEvent
        ListRevision int64
        ListFunc     func() (results []*pb.WatchInstanceResponse, rev int64)
@@ -45,7 +52,7 @@ type InstanceEventListWatcher struct {
 func (w *InstanceEventListWatcher) SetError(err error) {
        w.Subscriber.SetError(err)
        // 触发清理job
-       e := w.Service().Publish(notify.NewNotifyServiceHealthCheckJob(w))
+       e := w.Bus().Fire(event.NewUnhealthyEvent(w))
        if e != nil {
                log.Error("", e)
        }
@@ -55,7 +62,7 @@ func (w *InstanceEventListWatcher) OnAccept() {
        if w.Err() != nil {
                return
        }
-       log.Debugf("accepted by notify service, %s watcher %s %s", w.Type(), 
w.Group(), w.Subject())
+       log.Debugf("accepted by event service, %s watcher %s %s", w.Type(), 
w.Group(), w.Subject())
        gopool.Go(w.listAndPublishJobs)
 }
 
@@ -72,7 +79,7 @@ func (w *InstanceEventListWatcher) listAndPublishJobs(_ 
context.Context) {
 }
 
 //被通知
-func (w *InstanceEventListWatcher) OnMessage(job notify.Event) {
+func (w *InstanceEventListWatcher) OnMessage(job event.Event) {
        if w.Err() != nil {
                return
        }
@@ -97,7 +104,7 @@ func (w *InstanceEventListWatcher) OnMessage(job 
notify.Event) {
        }
 
        if wJob.Revision <= w.ListRevision {
-               log.Warnf("unexpected notify %s job is coming in, watcher %s 
%s, job is %v, current revision is %v",
+               log.Warnf("unexpected event %s job is coming in, watcher %s %s, 
job is %v, current revision is %v",
                        w.Type(), w.Group(), w.Subject(), job, w.ListRevision)
                return
        }
@@ -131,7 +138,7 @@ func (w *InstanceEventListWatcher) Close() {
 
 func NewInstanceEvent(serviceID, domainProject string, rev int64, response 
*pb.WatchInstanceResponse) *InstanceEvent {
        return &InstanceEvent{
-               Event:    notify.NewEvent(INSTANCE, domainProject, serviceID),
+               Event:    event.NewEvent(INSTANCE, domainProject, serviceID),
                Revision: rev,
                Response: response,
        }
@@ -139,7 +146,7 @@ func NewInstanceEvent(serviceID, domainProject string, rev 
int64, response *pb.W
 
 func NewInstanceEventWithTime(serviceID, domainProject string, rev int64, 
createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
        return &InstanceEvent{
-               Event:    notify.NewEventWithTime(INSTANCE, domainProject, 
serviceID, createAt),
+               Event:    event.NewEventWithTime(INSTANCE, domainProject, 
serviceID, createAt),
                Revision: rev,
                Response: response,
        }
@@ -148,7 +155,7 @@ func NewInstanceEventWithTime(serviceID, domainProject 
string, rev int64, create
 func NewInstanceEventListWatcher(serviceID, domainProject string,
        listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) 
*InstanceEventListWatcher {
        watcher := &InstanceEventListWatcher{
-               Subscriber: notify.NewSubscriber(INSTANCE, domainProject, 
serviceID),
+               Subscriber: event.NewSubscriber(INSTANCE, domainProject, 
serviceID),
                Job:        make(chan *InstanceEvent, INSTANCE.QueueSize()),
                ListFunc:   listFunc,
                listCh:     make(chan struct{}),
diff --git a/server/health/health_test.go b/server/health/health_test.go
index afd5786..804df1e 100644
--- a/server/health/health_test.go
+++ b/server/health/health_test.go
@@ -17,13 +17,13 @@ package health
 
 import (
        "github.com/apache/servicecomb-service-center/server/alarm"
-       "github.com/apache/servicecomb-service-center/server/notify"
+       "github.com/apache/servicecomb-service-center/server/event"
        "testing"
        "time"
 )
 
 func TestDefaultHealthChecker_Healthy(t *testing.T) {
-       notify.GetNotifyCenter().Start()
+       event.Center().Start()
 
        // normal case
        var hc DefaultHealthChecker
diff --git a/server/notify/common.go b/server/notify/common.go
deleted file mode 100644
index d0db027..0000000
--- a/server/notify/common.go
+++ /dev/null
@@ -1,29 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package notify
-
-import "time"
-
-const (
-       AddJobTimeout          = 1 * time.Second
-       HeartbeatInterval      = 30 * time.Second
-       ReadTimeout            = HeartbeatInterval * 4
-       SendTimeout            = 5 * time.Second
-       InstanceEventQueueSize = 5000
-       ReadMaxBody            = 64
-       Websocket              = "Websocket"
-       GRPC                   = "gRPC"
-)
diff --git a/server/plugin/discovery/k8s/adaptor/listwatcher.go 
b/server/plugin/discovery/k8s/adaptor/listwatcher.go
index 520d400..02874d0 100644
--- a/server/plugin/discovery/k8s/adaptor/listwatcher.go
+++ b/server/plugin/discovery/k8s/adaptor/listwatcher.go
@@ -59,16 +59,16 @@ func NewListWatcher(t K8sType, lister 
cache.SharedIndexInformer, f OnEventFunc)
        lw.AddEventHandler(
                cache.ResourceEventHandlerFuncs{
                        AddFunc: func(obj interface{}) {
-                               Queue(t).Add(queue.Task{Object: 
K8sEvent{EventType: pb.EVT_CREATE, Object: obj}})
+                               Queue(t).Add(queue.Task{Payload: 
K8sEvent{EventType: pb.EVT_CREATE, Object: obj}})
                        },
                        UpdateFunc: func(old, new interface{}) {
                                if !reflect.DeepEqual(old, new) {
-                                       Queue(t).Add(queue.Task{Object: 
K8sEvent{EventType: pb.EVT_UPDATE, Object: new,
+                                       Queue(t).Add(queue.Task{Payload: 
K8sEvent{EventType: pb.EVT_UPDATE, Object: new,
                                                PrevObject: old}})
                                }
                        },
                        DeleteFunc: func(obj interface{}) {
-                               Queue(t).Add(queue.Task{Object: 
K8sEvent{EventType: pb.EVT_DELETE, Object: obj}})
+                               Queue(t).Add(queue.Task{Payload: 
K8sEvent{EventType: pb.EVT_DELETE, Object: obj}})
                        },
                })
        Queue(t).AddWorker(lw)
diff --git a/server/server.go b/server/server.go
index de22449..c724de0 100644
--- a/server/server.go
+++ b/server/server.go
@@ -28,13 +28,13 @@ import (
        "time"
 
        "context"
+       nf "github.com/apache/servicecomb-service-center/pkg/event"
        "github.com/apache/servicecomb-service-center/pkg/gopool"
        "github.com/apache/servicecomb-service-center/pkg/log"
-       nf "github.com/apache/servicecomb-service-center/pkg/notify"
        "github.com/apache/servicecomb-service-center/server/core"
        "github.com/apache/servicecomb-service-center/server/core/backend"
+       "github.com/apache/servicecomb-service-center/server/event"
        "github.com/apache/servicecomb-service-center/server/mux"
-       "github.com/apache/servicecomb-service-center/server/notify"
        "github.com/apache/servicecomb-service-center/server/plugin"
        serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
        "github.com/apache/servicecomb-service-center/server/task"
@@ -51,10 +51,10 @@ func Run() {
 }
 
 type ServiceCenterServer struct {
-       apiService    *APIServer
-       notifyService *nf.Service
-       cacheService  *backend.KvStore
-       goroutine     *gopool.Pool
+       apiService   *APIServer
+       eventCenter  *nf.BusService
+       cacheService *backend.KvStore
+       goroutine    *gopool.Pool
 }
 
 func (s *ServiceCenterServer) Run() {
@@ -187,13 +187,13 @@ func (s *ServiceCenterServer) clearNoInstanceServices() {
 func (s *ServiceCenterServer) initialize() {
        s.cacheService = backend.Store()
        s.apiService = GetAPIServer()
-       s.notifyService = notify.GetNotifyCenter()
+       s.eventCenter = event.Center()
        s.goroutine = gopool.New(context.Background())
 }
 
 func (s *ServiceCenterServer) startServices() {
        // notifications
-       s.notifyService.Start()
+       s.eventCenter.Start()
 
        // load server plugins
        plugin.LoadPlugins()
@@ -240,8 +240,8 @@ func (s *ServiceCenterServer) Stop() {
                s.apiService.Stop()
        }
 
-       if s.notifyService != nil {
-               s.notifyService.Stop()
+       if s.eventCenter != nil {
+               s.eventCenter.Stop()
        }
 
        if s.cacheService != nil {
diff --git a/server/service/event/instance_event_handler.go 
b/server/service/event/instance_event_handler.go
index c355760..0d6010d 100644
--- a/server/service/event/instance_event_handler.go
+++ b/server/service/event/instance_event_handler.go
@@ -24,7 +24,7 @@ import (
        apt "github.com/apache/servicecomb-service-center/server/core"
        "github.com/apache/servicecomb-service-center/server/core/backend"
        "github.com/apache/servicecomb-service-center/server/core/proto"
-       "github.com/apache/servicecomb-service-center/server/notify"
+       "github.com/apache/servicecomb-service-center/server/event"
        "github.com/apache/servicecomb-service-center/server/plugin/discovery"
        "github.com/apache/servicecomb-service-center/server/service/cache"
        "github.com/apache/servicecomb-service-center/server/service/metrics"
@@ -82,7 +82,7 @@ func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) 
{
                }
        }
 
-       if notify.GetNotifyCenter().Closed() {
+       if event.Center().Closed() {
                log.Warnf("caught [%s] instance[%s/%s] event, endpoints %v, but 
notify service is closed",
                        action, providerID, providerInstanceID, 
instance.Endpoints)
                return
@@ -136,10 +136,10 @@ func PublishInstanceEvent(evt discovery.KvEvent, 
domainProject string, serviceKe
        }
        for _, consumerID := range subscribers {
                // TODO add超时怎么处理?
-               job := notify.NewInstanceEventWithTime(consumerID, 
domainProject, evt.Revision, evt.CreateAt, response)
-               err := notify.GetNotifyCenter().Publish(job)
+               evt := event.NewInstanceEventWithTime(consumerID, 
domainProject, evt.Revision, evt.CreateAt, response)
+               err := event.Center().Fire(evt)
                if err != nil {
-                       log.Errorf(err, "publish job failed")
+                       log.Errorf(err, "publish event[%v] into channel 
failed", evt)
                }
        }
 }
diff --git a/server/service/event/rule_event_handler.go 
b/server/service/event/rule_event_handler.go
index dedf31e..d719800 100644
--- a/server/service/event/rule_event_handler.go
+++ b/server/service/event/rule_event_handler.go
@@ -26,7 +26,7 @@ import (
        "github.com/apache/servicecomb-service-center/server/core"
        "github.com/apache/servicecomb-service-center/server/core/backend"
        "github.com/apache/servicecomb-service-center/server/core/proto"
-       "github.com/apache/servicecomb-service-center/server/notify"
+       "github.com/apache/servicecomb-service-center/server/event"
        "github.com/apache/servicecomb-service-center/server/plugin/discovery"
        serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
 )
@@ -98,7 +98,7 @@ func (h *RuleEventHandler) OnEvent(evt discovery.KvEvent) {
        }
 
        providerID, ruleID, domainProject := core.GetInfoFromRuleKV(evt.KV.Key)
-       if notify.GetNotifyCenter().Closed() {
+       if event.Center().Closed() {
                log.Warnf("caught [%s] service rule[%s/%s] event, but notify 
service is closed",
                        action, providerID, ruleID)
                return
diff --git a/server/service/event/tag_event_handler.go 
b/server/service/event/tag_event_handler.go
index b66fed9..7416f2d 100644
--- a/server/service/event/tag_event_handler.go
+++ b/server/service/event/tag_event_handler.go
@@ -26,7 +26,7 @@ import (
        "github.com/apache/servicecomb-service-center/server/core"
        "github.com/apache/servicecomb-service-center/server/core/backend"
        "github.com/apache/servicecomb-service-center/server/core/proto"
-       "github.com/apache/servicecomb-service-center/server/notify"
+       "github.com/apache/servicecomb-service-center/server/event"
        "github.com/apache/servicecomb-service-center/server/plugin/discovery"
        "github.com/apache/servicecomb-service-center/server/service/cache"
        serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
@@ -112,7 +112,7 @@ func (h *TagEventHandler) OnEvent(evt discovery.KvEvent) {
 
        consumerID, domainProject := core.GetInfoFromTagKV(evt.KV.Key)
 
-       if notify.GetNotifyCenter().Closed() {
+       if event.Center().Closed() {
                log.Warnf("caught [%s] service tags[%s/%s] event, but notify 
service is closed",
                        action, consumerID, evt.KV.Value)
                return
diff --git a/server/service/watch.go b/server/service/watch.go
index a30b5f2..2163f03 100644
--- a/server/service/watch.go
+++ b/server/service/watch.go
@@ -23,8 +23,9 @@ import (
        "github.com/apache/servicecomb-service-center/pkg/log"
        pb "github.com/apache/servicecomb-service-center/pkg/registry"
        "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/apache/servicecomb-service-center/server/connection/grpc"
+       "github.com/apache/servicecomb-service-center/server/connection/ws"
        "github.com/apache/servicecomb-service-center/server/core/proto"
-       "github.com/apache/servicecomb-service-center/server/notify"
        serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
        "github.com/gorilla/websocket"
 )
@@ -47,25 +48,25 @@ func (s *InstanceService) Watch(in 
*pb.WatchInstanceRequest, stream proto.Servic
                return err
        }
 
-       return notify.DoStreamListAndWatch(stream.Context(), in.SelfServiceId, 
nil, stream)
+       return grpc.ListAndWatch(stream.Context(), in.SelfServiceId, nil, 
stream)
 }
 
 func (s *InstanceService) WebSocketWatch(ctx context.Context, in 
*pb.WatchInstanceRequest, conn *websocket.Conn) {
        log.Infof("new a web socket watch with service[%s]", in.SelfServiceId)
        if err := s.WatchPreOpera(ctx, in); err != nil {
-               notify.EstablishWebSocketError(conn, err)
+               ws.SendEstablishError(conn, err)
                return
        }
-       notify.DoWebSocketListAndWatch(ctx, in.SelfServiceId, nil, conn)
+       ws.ListAndWatch(ctx, in.SelfServiceId, nil, conn)
 }
 
 func (s *InstanceService) WebSocketListAndWatch(ctx context.Context, in 
*pb.WatchInstanceRequest, conn *websocket.Conn) {
        log.Infof("new a web socket list and watch with service[%s]", 
in.SelfServiceId)
        if err := s.WatchPreOpera(ctx, in); err != nil {
-               notify.EstablishWebSocketError(conn, err)
+               ws.SendEstablishError(conn, err)
                return
        }
-       notify.DoWebSocketListAndWatch(ctx, in.SelfServiceId, func() 
([]*pb.WatchInstanceResponse, int64) {
+       ws.ListAndWatch(ctx, in.SelfServiceId, func() 
([]*pb.WatchInstanceResponse, int64) {
                return serviceUtil.QueryAllProvidersInstances(ctx, 
in.SelfServiceId)
        }, conn)
 }

Reply via email to