This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch alarm
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git

commit 2e6388701f0476d2cb2206ebfe81ed8dd93b463c
Author: little-cui <[email protected]>
AuthorDate: Thu Nov 29 18:39:31 2018 +0800

    SCB-1049 Restructure
---
 .../service/notification => pkg/notify}/common.go  | 44 +-------------
 .../service/notification => pkg/notify}/group.go   |  4 +-
 .../notification => pkg/notify}/group_test.go      | 15 ++---
 .../service/notification => pkg/notify}/notice.go  | 26 ++++----
 .../notify}/notification_healthchecker.go          | 24 +++-----
 .../notify}/notification_service.go                | 70 ++++++++++------------
 .../notify}/notification_test.go                   | 31 ++++------
 .../notification => pkg/notify}/processor.go       | 49 +++++++--------
 .../notification => pkg/notify}/processor_test.go  | 29 +++++----
 .../service/notification => pkg/notify}/subject.go |  4 +-
 .../notification => pkg/notify}/subject_test.go    |  9 +--
 .../notification => pkg/notify}/subscriber.go      | 36 +++++------
 pkg/notify/types.go                                | 59 ++++++++++++++++++
 pkg/notify/types_test.go                           | 36 +++++++++++
 server/alarm/alarm.go                              |  5 +-
 server/notify/common.go                            | 39 ++++++++++++
 .../notification => notify}/listwatcher.go         | 61 +++++++++----------
 .../{service/notification => notify}/publisher.go  |  2 +-
 server/{service/notification => notify}/stream.go  | 22 +++++--
 .../notification => notify}/stream_test.go         | 23 ++++---
 .../{service/notification => notify}/websocket.go  | 16 ++---
 .../notification => notify}/websocket_test.go      | 48 +++++++--------
 server/server.go                                   |  5 +-
 server/service/event/instance_event_handler.go     |  8 +--
 server/service/event/rule_event_handler.go         |  4 +-
 server/service/event/tag_event_handler.go          |  4 +-
 server/service/watch.go                            | 22 +++----
 27 files changed, 389 insertions(+), 306 deletions(-)

diff --git a/server/service/notification/common.go b/pkg/notify/common.go
similarity index 52%
rename from server/service/notification/common.go
rename to pkg/notify/common.go
index 1617f6f..5217aab 100644
--- a/server/service/notification/common.go
+++ b/pkg/notify/common.go
@@ -14,50 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notification
-
-import (
-       "strconv"
-       "time"
-)
+package notify
 
 const (
-       DEFAULT_MAX_QUEUE          = 1000
-       DEFAULT_ADD_JOB_TIMEOUT    = 1 * time.Second
-       DEFAULT_SEND_TIMEOUT       = 5 * time.Second
-       DEFAULT_HEARTBEAT_INTERVAL = 30 * time.Second
+       DefaultQueueSize = 1000
 )
 
 const (
-       NOTIFTY NotifyType = iota
-       INSTANCE
-       typeEnd
+       NOTIFTY Type = iota
 )
-
-type NotifyType int
-
-func (nt NotifyType) String() string {
-       if int(nt) < len(notifyTypeNames) {
-               return notifyTypeNames[nt]
-       }
-       return "NotifyType" + strconv.Itoa(int(nt))
-}
-
-func (nt NotifyType) QueueSize() (s int) {
-       if int(nt) < len(notifyTypeQueues) {
-               s = notifyTypeQueues[nt]
-       }
-       if s <= 0 {
-               s = DEFAULT_MAX_QUEUE
-       }
-       return
-}
-
-var notifyTypeNames = []string{
-       NOTIFTY:  "NOTIFTY",
-       INSTANCE: "INSTANCE",
-}
-
-var notifyTypeQueues = []int{
-       INSTANCE: 100 * 1000,
-}
diff --git a/server/service/notification/group.go b/pkg/notify/group.go
similarity index 96%
rename from server/service/notification/group.go
rename to pkg/notify/group.go
index 8c677d4..58f39a0 100644
--- a/server/service/notification/group.go
+++ b/pkg/notify/group.go
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notification
+package notify
 
 import (
        "github.com/apache/servicecomb-service-center/pkg/util"
@@ -29,7 +29,7 @@ func (g *Group) Name() string {
        return g.name
 }
 
-func (g *Group) Notify(job NotifyJob) {
+func (g *Group) Notify(job Event) {
        g.subscribers.ForEach(func(item util.MapItem) (next bool) {
                item.Value.(Subscriber).OnMessage(job)
                return true
diff --git a/server/service/notification/group_test.go 
b/pkg/notify/group_test.go
similarity index 86%
rename from server/service/notification/group_test.go
rename to pkg/notify/group_test.go
index b241eb4..44b185d 100644
--- a/server/service/notification/group_test.go
+++ b/pkg/notify/group_test.go
@@ -14,20 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notification
+package notify
 
 import "testing"
 
 type mockSubscriber struct {
-       *BaseSubscriber
-       job NotifyJob
+       Subscriber
+       job Event
 }
 
-func (s *mockSubscriber) OnMessage(job NotifyJob) {
+func (s *mockSubscriber) OnMessage(job Event) {
        s.job = job
 }
 
 func TestGroup_Add(t *testing.T) {
+       INSTANCE := RegisterType("INSTANCE", 1)
        m := NewSubscriber(INSTANCE, "s1", "g1")
        g := NewGroup("g1")
        if g.Name() != "g1" {
@@ -39,7 +40,7 @@ func TestGroup_Add(t *testing.T) {
        if g.AddSubscriber(NewSubscriber(INSTANCE, "s1", "g1")) == m {
                t.Fatalf("TestGroup_Add failed")
        }
-       same := *m
+       same := *(m.(*baseSubscriber))
        if g.AddSubscriber(&same) != m {
                t.Fatalf("TestGroup_Add failed")
        }
@@ -54,14 +55,14 @@ func TestGroup_Add(t *testing.T) {
                t.Fatalf("TestGroup_Add failed")
        }
 
-       mock := &mockSubscriber{BaseSubscriber: NewSubscriber(INSTANCE, "s1", 
"g1")}
+       mock := &mockSubscriber{Subscriber: NewSubscriber(INSTANCE, "s1", "g1")}
        if g.AddSubscriber(mock) != mock {
                t.Fatalf("TestGroup_Add failed")
        }
        if g.Subscribers(mock.Id()) != mock {
                t.Fatalf("TestGroup_Add failed")
        }
-       job := &BaseNotifyJob{nType: INSTANCE}
+       job := &baseEvent{nType: INSTANCE}
        g.Notify(job)
        if mock.job != job {
                t.Fatalf("TestGroup_Add failed")
diff --git a/server/service/notification/notice.go b/pkg/notify/notice.go
similarity index 69%
rename from server/service/notification/notice.go
rename to pkg/notify/notice.go
index 26d5235..836d47b 100644
--- a/server/service/notification/notice.go
+++ b/pkg/notify/notice.go
@@ -14,28 +14,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notification
+package notify
 
-type NotifyJob interface {
-       Type() NotifyType
-       Group() string
-       Subject() string
+type Event interface {
+       Type() Type
+       Subject() string // required!
+       Group() string   // broadcast all the subscriber of the same subject if 
group is empty
 }
 
-type BaseNotifyJob struct {
-       nType   NotifyType
+type baseEvent struct {
+       nType   Type
        subject string
        group   string
 }
 
-func (s *BaseNotifyJob) Type() NotifyType {
+func (s *baseEvent) Type() Type {
        return s.nType
 }
 
-func (s *BaseNotifyJob) Group() string {
+func (s *baseEvent) Subject() string {
+       return s.subject
+}
+
+func (s *baseEvent) Group() string {
        return s.group
 }
 
-func (s *BaseNotifyJob) Subject() string {
-       return s.subject
+func NewEvent(t Type, s string, g string) Event {
+       return &baseEvent{t, s, g}
 }
diff --git a/server/service/notification/notification_healthchecker.go 
b/pkg/notify/notification_healthchecker.go
similarity index 76%
rename from server/service/notification/notification_healthchecker.go
rename to pkg/notify/notification_healthchecker.go
index e529314..8cc41cd 100644
--- a/server/service/notification/notification_healthchecker.go
+++ b/pkg/notify/notification_healthchecker.go
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notification
+package notify
 
 import "github.com/apache/servicecomb-service-center/pkg/log"
 
@@ -25,15 +25,15 @@ const (
 
 //Notifier 健康检查
 type NotifyServiceHealthChecker struct {
-       BaseSubscriber
+       Subscriber
 }
 
 type NotifyServiceHealthCheckJob struct {
-       *BaseNotifyJob
+       Event
        ErrorSubscriber Subscriber
 }
 
-func (s *NotifyServiceHealthChecker) OnMessage(job NotifyJob) {
+func (s *NotifyServiceHealthChecker) OnMessage(job Event) {
        j := job.(*NotifyServiceHealthCheckJob)
        err := j.ErrorSubscriber.Err()
 
@@ -43,28 +43,20 @@ func (s *NotifyServiceHealthChecker) OnMessage(job 
NotifyJob) {
                return
        }
 
-       log.Debugf("notification service remove %s watcher, error: %s, subject: 
%s, group: %s",
-               j.ErrorSubscriber.Type(), err.Error(), 
j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Group())
+       log.Debugf("notification service remove %s watcher, error: %v, subject: 
%s, group: %s",
+               j.ErrorSubscriber.Type(), err, j.ErrorSubscriber.Subject(), 
j.ErrorSubscriber.Group())
        s.Service().RemoveSubscriber(j.ErrorSubscriber)
 }
 
 func NewNotifyServiceHealthChecker() *NotifyServiceHealthChecker {
        return &NotifyServiceHealthChecker{
-               BaseSubscriber: BaseSubscriber{
-                       group:   NOTIFY_SERVER_CHECKER_NAME,
-                       subject: NOTIFY_SERVER_CHECK_SUBJECT,
-                       nType:   NOTIFTY,
-               },
+               Subscriber: NewSubscriber(NOTIFTY, NOTIFY_SERVER_CHECK_SUBJECT, 
NOTIFY_SERVER_CHECKER_NAME),
        }
 }
 
 func NewNotifyServiceHealthCheckJob(s Subscriber) *NotifyServiceHealthCheckJob 
{
        return &NotifyServiceHealthCheckJob{
-               BaseNotifyJob: &BaseNotifyJob{
-                       group:   NOTIFY_SERVER_CHECKER_NAME,
-                       subject: NOTIFY_SERVER_CHECK_SUBJECT,
-                       nType:   NOTIFTY,
-               },
+               Event:           NewEvent(NOTIFTY, NOTIFY_SERVER_CHECK_SUBJECT, 
NOTIFY_SERVER_CHECKER_NAME),
                ErrorSubscriber: s,
        }
 }
diff --git a/server/service/notification/notification_service.go 
b/pkg/notify/notification_service.go
similarity index 66%
rename from server/service/notification/notification_service.go
rename to pkg/notify/notification_service.go
index 9c81401..3733004 100644
--- a/server/service/notification/notification_service.go
+++ b/pkg/notify/notification_service.go
@@ -14,29 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notification
+package notify
 
 import (
        "errors"
-       "github.com/apache/servicecomb-service-center/pkg/gopool"
        "github.com/apache/servicecomb-service-center/pkg/log"
-       "github.com/apache/servicecomb-service-center/pkg/util"
-       "golang.org/x/net/context"
        "sync"
 )
 
-var notifyService *NotifyService
-
-func init() {
-       notifyService = &NotifyService{
-               isClose:   true,
-               goroutine: gopool.New(context.Background()),
-       }
-}
-
 type NotifyService struct {
-       processors *util.ConcurrentMap
-       goroutine  *gopool.Pool
+       processors map[Type]*Processor
        err        chan error
        closeMux   sync.RWMutex
        isClose    bool
@@ -47,10 +34,9 @@ func (s *NotifyService) Err() <-chan error {
 }
 
 func (s *NotifyService) init() {
-       s.processors = util.NewConcurrentMap(int(typeEnd))
        s.err = make(chan error, 1)
-       for i := NotifyType(0); i != typeEnd; i++ {
-               s.processors.Put(i, NewProcessor(i.String(), i.QueueSize()))
+       for _, t := range Types() {
+               s.processors[t] = NewProcessor(t.String(), t.QueueSize())
        }
 }
 
@@ -67,12 +53,9 @@ func (s *NotifyService) Start() {
        // 错误subscriber清理
        s.AddSubscriber(NewNotifyServiceHealthChecker())
 
-       log.Debugf("notify service is started")
+       s.startProcessors()
 
-       s.processors.ForEach(func(item util.MapItem) (next bool) {
-               s.goroutine.Do(item.Value.(*Processor).Do)
-               return true
-       })
+       log.Debugf("notify service is started")
 }
 
 func (s *NotifyService) AddSubscriber(n Subscriber) error {
@@ -80,45 +63,51 @@ func (s *NotifyService) AddSubscriber(n Subscriber) error {
                return errors.New("server is shutting down")
        }
 
-       itf, ok := s.processors.Get(n.Type())
+       p, ok := s.processors[n.Type()]
        if !ok {
                return errors.New("Unknown subscribe type")
        }
        n.SetService(s)
        n.OnAccept()
 
-       itf.(*Processor).AddSubscriber(n)
+       p.AddSubscriber(n)
        return nil
 }
 
 func (s *NotifyService) RemoveSubscriber(n Subscriber) {
-       itf, ok := s.processors.Get(n.Type())
+       p, ok := s.processors[n.Type()]
        if !ok {
                return
        }
 
-       itf.(*Processor).Remove(n)
+       p.Remove(n)
        n.Close()
 }
 
-func (s *NotifyService) RemoveAllSubscribers() {
-       s.processors.ForEach(func(item util.MapItem) (next bool) {
-               item.Value.(*Processor).Clear()
-               return true
-       })
+func (s *NotifyService) startProcessors() {
+       for _, p := range s.processors {
+               p.Run()
+       }
+}
+
+func (s *NotifyService) stopProcessors() {
+       for _, p := range s.processors {
+               p.Clear()
+               p.Stop()
+       }
 }
 
 //通知内容塞到队列里
-func (s *NotifyService) AddJob(job NotifyJob) error {
+func (s *NotifyService) Publish(job Event) error {
        if s.Closed() {
                return errors.New("add notify job failed for server shutdown")
        }
 
-       itf, ok := s.processors.Get(job.Type())
+       p, ok := s.processors[job.Type()]
        if !ok {
                return errors.New("Unknown job type")
        }
-       itf.(*Processor).Accept(job)
+       p.Accept(job)
        return nil
 }
 
@@ -137,15 +126,16 @@ func (s *NotifyService) Stop() {
        s.isClose = true
        s.closeMux.Unlock()
 
-       s.goroutine.Close(true)
-
-       s.RemoveAllSubscribers()
+       s.stopProcessors()
 
        close(s.err)
 
        log.Debug("notify service stopped")
 }
 
-func GetNotifyService() *NotifyService {
-       return notifyService
+func NewNotifyService() *NotifyService {
+       return &NotifyService{
+               processors: make(map[Type]*Processor),
+               isClose:    true,
+       }
 }
diff --git a/server/service/notification/notification_test.go 
b/pkg/notify/notification_test.go
similarity index 76%
rename from server/service/notification/notification_test.go
rename to pkg/notify/notification_test.go
index 9a05f29..7e3360b 100644
--- a/server/service/notification/notification_test.go
+++ b/pkg/notify/notification_test.go
@@ -14,25 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notification
+package notify
 
 import (
-       "github.com/apache/servicecomb-service-center/pkg/gopool"
-       "golang.org/x/net/context"
        "testing"
        "time"
 )
 
 func TestGetNotifyService(t *testing.T) {
-       n := NotifyType(999)
-       if n.String() != "NotifyType999" {
-               t.Fatalf("TestGetNotifyService failed")
-       }
-
-       notifyService := &NotifyService{
-               isClose:   true,
-               goroutine: gopool.New(context.Background()),
-       }
+       notifyService := NewNotifyService()
        if notifyService == nil {
                t.Fatalf("TestGetNotifyService failed")
        }
@@ -44,11 +34,12 @@ func TestGetNotifyService(t *testing.T) {
        if err == nil {
                t.Fatalf("TestGetNotifyService failed")
        }
-       err = notifyService.AddJob(nil)
+       err = notifyService.Publish(nil)
        if err == nil {
                t.Fatalf("TestGetNotifyService failed")
        }
 
+       INSTANCE := RegisterType("INSTANCE", 1)
        notifyService.Start()
        notifyService.Start()
        if notifyService.Closed() != false {
@@ -65,25 +56,27 @@ func TestGetNotifyService(t *testing.T) {
        if err == nil {
                t.Fatalf("TestGetNotifyService failed")
        }
+       notifyService.RemoveSubscriber(s)
+
        s = NewSubscriber(INSTANCE, "s", "g")
        err = notifyService.AddSubscriber(s)
        if err != nil {
-               t.Fatalf("TestGetNotifyService failed")
+               t.Fatalf("TestGetNotifyService failed, %v", err)
        }
-       j := &BaseNotifyJob{INSTANCE, "s", "g"}
-       err = notifyService.AddJob(j)
+       j := &baseEvent{INSTANCE, "s", "g"}
+       err = notifyService.Publish(j)
        if err != nil {
                t.Fatalf("TestGetNotifyService failed")
        }
-       err = 
notifyService.AddJob(NewNotifyServiceHealthCheckJob(NewNotifyServiceHealthChecker()))
+       err = 
notifyService.Publish(NewNotifyServiceHealthCheckJob(NewNotifyServiceHealthChecker()))
        if err != nil {
                t.Fatalf("TestGetNotifyService failed")
        }
-       <-time.After(time.Second)
-       err = notifyService.AddJob(NewNotifyServiceHealthCheckJob(s))
+       err = notifyService.Publish(NewNotifyServiceHealthCheckJob(s))
        if err != nil {
                t.Fatalf("TestGetNotifyService failed")
        }
+       <-time.After(time.Second)
        notifyService.Stop()
        if notifyService.Closed() != true {
                t.Fatalf("TestGetNotifyService failed")
diff --git a/server/service/notification/processor.go b/pkg/notify/processor.go
similarity index 76%
rename from server/service/notification/processor.go
rename to pkg/notify/processor.go
index 17528a8..9df5288 100644
--- a/server/service/notification/processor.go
+++ b/pkg/notify/processor.go
@@ -14,25 +14,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notification
+package notify
 
 import (
-       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/queue"
        "github.com/apache/servicecomb-service-center/pkg/util"
        "golang.org/x/net/context"
 )
 
 type Processor struct {
+       *queue.TaskQueue
+
        name     string
        subjects *util.ConcurrentMap
-       queue    chan NotifyJob
+       queue    chan Event
 }
 
 func (p *Processor) Name() string {
        return p.name
 }
 
-func (p *Processor) Notify(job NotifyJob) {
+func (p *Processor) Accept(job Event) {
+       p.Add(queue.Task{Object: job})
+}
+
+func (p *Processor) Handle(ctx context.Context, obj interface{}) {
+       p.Notify(obj.(Event))
+}
+
+func (p *Processor) Notify(job Event) {
        if itf, ok := p.subjects.Get(job.Subject()); ok {
                itf.(*Subject).Notify(job)
        }
@@ -79,29 +89,12 @@ func (p *Processor) Clear() {
        p.subjects.Clear()
 }
 
-func (p *Processor) Accept(job NotifyJob) {
-       defer log.Recover()
-       p.queue <- job
-}
-
-func (p *Processor) Do(ctx context.Context) {
-       for {
-               select {
-               case <-ctx.Done():
-                       return
-               case job, ok := <-p.queue:
-                       if !ok {
-                               return
-                       }
-                       p.Notify(job)
-               }
-       }
-}
-
-func NewProcessor(name string, queue int) *Processor {
-       return &Processor{
-               name:     name,
-               subjects: util.NewConcurrentMap(0),
-               queue:    make(chan NotifyJob, queue),
+func NewProcessor(name string, queueSize int) *Processor {
+       p := &Processor{
+               TaskQueue: queue.NewTaskQueue(queueSize),
+               name:      name,
+               subjects:  util.NewConcurrentMap(0),
        }
+       p.AddWorker(p)
+       return p
 }
diff --git a/server/service/notification/processor_test.go 
b/pkg/notify/processor_test.go
similarity index 82%
rename from server/service/notification/processor_test.go
rename to pkg/notify/processor_test.go
index 7b6b748..3505457 100644
--- a/server/service/notification/processor_test.go
+++ b/pkg/notify/processor_test.go
@@ -14,31 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notification
+package notify
 
 import (
-       "github.com/apache/servicecomb-service-center/pkg/gopool"
        "testing"
        "time"
 )
 
 type mockSubscriberChan struct {
-       *BaseSubscriber
-       job chan NotifyJob
+       Subscriber
+       job chan Event
 }
 
-func (s *mockSubscriberChan) OnMessage(job NotifyJob) {
+func (s *mockSubscriberChan) OnMessage(job Event) {
        s.job <- job
 }
 
 func TestProcessor_Do(t *testing.T) {
+       INSTANCE := RegisterType("INSTANCE", 1)
        delay := 50 * time.Millisecond
-       mock1 := &mockSubscriberChan{BaseSubscriber: NewSubscriber(INSTANCE, 
"s1", "g1"),
-               job: make(chan NotifyJob, 1)}
-       mock2 := &mockSubscriberChan{BaseSubscriber: NewSubscriber(INSTANCE, 
"s1", "g2"),
-               job: make(chan NotifyJob, 1)}
+       mock1 := &mockSubscriberChan{Subscriber: NewSubscriber(INSTANCE, "s1", 
"g1"),
+               job: make(chan Event, 1)}
+       mock2 := &mockSubscriberChan{Subscriber: NewSubscriber(INSTANCE, "s1", 
"g2"),
+               job: make(chan Event, 1)}
        p := NewProcessor("p1", 0)
-       gopool.Go(p.Do)
        if p.Name() != "p1" {
                t.Fatalf("TestProcessor_Do")
        }
@@ -62,8 +61,8 @@ func TestProcessor_Do(t *testing.T) {
        }
        p.AddSubscriber(mock1)
        p.AddSubscriber(mock2)
-       job := &BaseNotifyJob{group: "g1"}
-       p.Accept(job)
+       job := &baseEvent{group: "g1"}
+       p.Handle(nil, job)
        select {
        case <-mock1.job:
                t.Fatalf("TestProcessor_Do")
@@ -71,7 +70,7 @@ func TestProcessor_Do(t *testing.T) {
        }
        job.subject = "s1"
        job.group = "g3"
-       p.Accept(job)
+       p.Handle(nil, job)
        select {
        case <-mock1.job:
                t.Fatalf("TestProcessor_Do")
@@ -79,7 +78,7 @@ func TestProcessor_Do(t *testing.T) {
        }
        job.subject = "s1"
        job.group = "g1"
-       p.Accept(job)
+       p.Handle(nil, job)
        select {
        case j := <-mock1.job:
                if j != job {
@@ -95,7 +94,7 @@ func TestProcessor_Do(t *testing.T) {
        }
        job.subject = "s1"
        job.group = ""
-       p.Accept(job)
+       p.Handle(nil, job)
        select {
        case j := <-mock1.job:
                if j != job {
diff --git a/server/service/notification/subject.go b/pkg/notify/subject.go
similarity index 96%
rename from server/service/notification/subject.go
rename to pkg/notify/subject.go
index cdc98f6..f052801 100644
--- a/server/service/notification/subject.go
+++ b/pkg/notify/subject.go
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notification
+package notify
 
 import (
        "github.com/apache/servicecomb-service-center/pkg/util"
@@ -29,7 +29,7 @@ func (s *Subject) Name() string {
        return s.name
 }
 
-func (s *Subject) Notify(job NotifyJob) {
+func (s *Subject) Notify(job Event) {
        if len(job.Group()) == 0 {
                s.groups.ForEach(func(item util.MapItem) (next bool) {
                        item.Value.(*Group).Notify(job)
diff --git a/server/service/notification/subject_test.go 
b/pkg/notify/subject_test.go
similarity index 88%
rename from server/service/notification/subject_test.go
rename to pkg/notify/subject_test.go
index 293c500..c9e955c 100644
--- a/server/service/notification/subject_test.go
+++ b/pkg/notify/subject_test.go
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notification
+package notify
 
 import "testing"
 
@@ -44,10 +44,11 @@ func TestSubject_Fetch(t *testing.T) {
        if s.Size() != 1 {
                t.Fatalf("TestSubject_Fetch failed")
        }
-       mock1 := &mockSubscriber{BaseSubscriber: NewSubscriber(INSTANCE, "s1", 
"g1")}
-       mock2 := &mockSubscriber{BaseSubscriber: NewSubscriber(INSTANCE, "s1", 
"g2")}
+       INSTANCE := RegisterType("INSTANCE", 1)
+       mock1 := &mockSubscriber{Subscriber: NewSubscriber(INSTANCE, "s1", 
"g1")}
+       mock2 := &mockSubscriber{Subscriber: NewSubscriber(INSTANCE, "s1", 
"g2")}
        g.AddSubscriber(mock1)
-       job := &BaseNotifyJob{group: "g3"}
+       job := &baseEvent{group: "g3"}
        s.Notify(job)
        if mock1.job != nil || mock2.job != nil {
                t.Fatalf("TestSubject_Fetch failed")
diff --git a/server/service/notification/subscriber.go 
b/pkg/notify/subscriber.go
similarity index 62%
rename from server/service/notification/subscriber.go
rename to pkg/notify/subscriber.go
index 24ec5db..7544919 100644
--- a/server/service/notification/subscriber.go
+++ b/pkg/notify/subscriber.go
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notification
+package notify
 
 import (
        "errors"
@@ -25,7 +25,7 @@ type Subscriber interface {
        Id() string
        Subject() string
        Group() string
-       Type() NotifyType
+       Type() Type
        Service() *NotifyService
        SetService(*NotifyService)
 
@@ -35,34 +35,34 @@ type Subscriber interface {
        Close()
        OnAccept()
        // The event bus will callback this function, so it must be non-blocked.
-       OnMessage(job NotifyJob)
+       OnMessage(job Event)
 }
 
-type BaseSubscriber struct {
+type baseSubscriber struct {
+       nType   Type
        id      string
        subject string
        group   string
-       nType   NotifyType
        service *NotifyService
        err     error
 }
 
-func (s *BaseSubscriber) Id() string                    { return s.id }
-func (s *BaseSubscriber) Subject() string               { return s.subject }
-func (s *BaseSubscriber) Group() string                 { return s.group }
-func (s *BaseSubscriber) Type() NotifyType              { return s.nType }
-func (s *BaseSubscriber) Service() *NotifyService       { return s.service }
-func (s *BaseSubscriber) SetService(svc *NotifyService) { s.service = svc }
-func (s *BaseSubscriber) Err() error                    { return s.err }
-func (s *BaseSubscriber) SetError(err error)            { s.err = err }
-func (s *BaseSubscriber) Close()                        {}
-func (s *BaseSubscriber) OnAccept()                     {}
-func (s *BaseSubscriber) OnMessage(job NotifyJob) {
+func (s *baseSubscriber) Id() string                    { return s.id }
+func (s *baseSubscriber) Subject() string               { return s.subject }
+func (s *baseSubscriber) Group() string                 { return s.group }
+func (s *baseSubscriber) Type() Type                    { return s.nType }
+func (s *baseSubscriber) Service() *NotifyService       { return s.service }
+func (s *baseSubscriber) SetService(svc *NotifyService) { s.service = svc }
+func (s *baseSubscriber) Err() error                    { return s.err }
+func (s *baseSubscriber) SetError(err error)            { s.err = err }
+func (s *baseSubscriber) Close()                        {}
+func (s *baseSubscriber) OnAccept()                     {}
+func (s *baseSubscriber) OnMessage(job Event) {
        s.SetError(errors.New("do not call base notifier OnMessage method"))
 }
 
-func NewSubscriber(nType NotifyType, subject, group string) *BaseSubscriber {
-       return &BaseSubscriber{
+func NewSubscriber(nType Type, subject, group string) Subscriber {
+       return &baseSubscriber{
                id:      util.GenerateUuid(),
                group:   group,
                subject: subject,
diff --git a/pkg/notify/types.go b/pkg/notify/types.go
new file mode 100644
index 0000000..829eea5
--- /dev/null
+++ b/pkg/notify/types.go
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package notify
+
+import "strconv"
+
+type Type int
+
+func (nt Type) String() string {
+       if int(nt) < len(typeNames) {
+               return typeNames[nt]
+       }
+       return "Type" + strconv.Itoa(int(nt))
+}
+
+func (nt Type) QueueSize() (s int) {
+       if int(nt) < len(typeQueues) {
+               s = typeQueues[nt]
+       }
+       if s <= 0 {
+               s = DefaultQueueSize
+       }
+       return
+}
+
+var typeNames = []string{
+       NOTIFTY: "NOTIFTY",
+}
+
+var typeQueues = []int{
+       NOTIFTY: 0,
+}
+
+func Types() (ts []Type) {
+       for i := range typeNames {
+               ts = append(ts, Type(i))
+       }
+       return
+}
+
+func RegisterType(name string, size int) Type {
+       l := len(typeNames)
+       typeNames = append(typeNames, name)
+       typeQueues = append(typeQueues, size)
+       return Type(l)
+}
diff --git a/pkg/notify/types_test.go b/pkg/notify/types_test.go
new file mode 100644
index 0000000..720af81
--- /dev/null
+++ b/pkg/notify/types_test.go
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package notify
+
+import "testing"
+
+func TestRegisterType(t *testing.T) {
+       id := RegisterType("a", 0)
+       if id.String() != "a" || id.QueueSize() != DefaultQueueSize {
+               t.Fatal("TestRegisterType failed", id.String(), id.QueueSize())
+       }
+       id = RegisterType("b", 1)
+       if id.String() != "b" || id.QueueSize() != 1 {
+               t.Fatal("TestRegisterType failed", id.String(), id.QueueSize())
+       }
+       id = Type(999)
+       if id.String() != "Type999" || id.QueueSize() != DefaultQueueSize {
+               t.Fatal("TestRegisterType failed", id.String(), id.QueueSize())
+       }
+       if NOTIFTY.String() != "NOTIFTY" || NOTIFTY.QueueSize() != 
DefaultQueueSize {
+               t.Fatal("TestRegisterType failed", id.String(), id.QueueSize())
+       }
+}
diff --git a/server/alarm/alarm.go b/server/alarm/alarm.go
index 64281b9..e3639e7 100644
--- a/server/alarm/alarm.go
+++ b/server/alarm/alarm.go
@@ -16,7 +16,8 @@
 package alarm
 
 import (
-       nf 
"github.com/apache/servicecomb-service-center/server/service/notification"
+       nf "github.com/apache/servicecomb-service-center/pkg/notify"
+       "github.com/apache/servicecomb-service-center/server/notify"
        "sync/atomic"
 )
 
@@ -80,5 +81,5 @@ func Alarm(id ID, fields ...Field) error {
        for _, f := range fields {
                ae.fields[f.Key] = f.Value
        }
-       return nf.GetNotifyService().Publish(ae)
+       return notify.NotifyCenter().Publish(ae)
 }
diff --git a/server/notify/common.go b/server/notify/common.go
new file mode 100644
index 0000000..b130cb1
--- /dev/null
+++ b/server/notify/common.go
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package notify
+
+import (
+       "github.com/apache/servicecomb-service-center/pkg/notify"
+       "time"
+)
+
+const (
+       AddJobTimeout          = 1 * time.Second
+       SendTimeout            = 5 * time.Second
+       HeartbeatTimeout       = 30 * time.Second
+       InstanceEventQueueSize = 5000
+)
+
+var INSTANCE = notify.RegisterType("INSTANCE", InstanceEventQueueSize)
+var notifyService *notify.NotifyService
+
+func init() {
+       notifyService = notify.NewNotifyService()
+}
+
+func NotifyCenter() *notify.NotifyService {
+       return notifyService
+}
diff --git a/server/service/notification/listwatcher.go 
b/server/notify/listwatcher.go
similarity index 66%
rename from server/service/notification/listwatcher.go
rename to server/notify/listwatcher.go
index d0a90f8..0b831d7 100644
--- a/server/service/notification/listwatcher.go
+++ b/server/notify/listwatcher.go
@@ -14,38 +14,39 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notification
+package notify
 
 import (
        "github.com/apache/servicecomb-service-center/pkg/gopool"
        "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/notify"
        pb "github.com/apache/servicecomb-service-center/server/core/proto"
        "golang.org/x/net/context"
        "time"
 )
 
 // 状态变化推送
-type WatchJob struct {
-       *BaseNotifyJob
+type InstanceEvent struct {
+       notify.Event
        Revision int64
        Response *pb.WatchInstanceResponse
 }
 
-type ListWatcher struct {
-       *BaseSubscriber
-       Job          chan *WatchJob
+type InstanceEventListWatcher struct {
+       notify.Subscriber
+       Job          chan *InstanceEvent
        ListRevision int64
        ListFunc     func() (results []*pb.WatchInstanceResponse, rev int64)
        listCh       chan struct{}
 }
 
-func (s *ListWatcher) SetError(err error) {
-       s.BaseSubscriber.SetError(err)
+func (s *InstanceEventListWatcher) SetError(err error) {
+       s.Subscriber.SetError(err)
        // 触发清理job
-       s.Service().AddJob(NewNotifyServiceHealthCheckJob(s))
+       s.Service().Publish(notify.NewNotifyServiceHealthCheckJob(s))
 }
 
-func (w *ListWatcher) OnAccept() {
+func (w *InstanceEventListWatcher) OnAccept() {
        if w.Err() != nil {
                return
        }
@@ -53,7 +54,7 @@ func (w *ListWatcher) OnAccept() {
        gopool.Go(w.listAndPublishJobs)
 }
 
-func (w *ListWatcher) listAndPublishJobs(_ context.Context) {
+func (w *InstanceEventListWatcher) listAndPublishJobs(_ context.Context) {
        defer close(w.listCh)
        if w.ListFunc == nil {
                return
@@ -61,17 +62,17 @@ func (w *ListWatcher) listAndPublishJobs(_ context.Context) 
{
        results, rev := w.ListFunc()
        w.ListRevision = rev
        for _, response := range results {
-               w.sendMessage(NewWatchJob(w.Group(), w.Subject(), 
w.ListRevision, response))
+               w.sendMessage(NewInstanceEvent(w.Group(), w.Subject(), 
w.ListRevision, response))
        }
 }
 
 //被通知
-func (w *ListWatcher) OnMessage(job NotifyJob) {
+func (w *InstanceEventListWatcher) OnMessage(job notify.Event) {
        if w.Err() != nil {
                return
        }
 
-       wJob, ok := job.(*WatchJob)
+       wJob, ok := job.(*InstanceEvent)
        if !ok {
                return
        }
@@ -98,7 +99,7 @@ func (w *ListWatcher) OnMessage(job NotifyJob) {
        w.sendMessage(wJob)
 }
 
-func (w *ListWatcher) sendMessage(job *WatchJob) {
+func (w *InstanceEventListWatcher) sendMessage(job *InstanceEvent) {
        defer log.Recover()
        select {
        case w.Job <- job:
@@ -115,33 +116,29 @@ func (w *ListWatcher) sendMessage(job *WatchJob) {
        }
 }
 
-func (w *ListWatcher) Timeout() time.Duration {
-       return DEFAULT_ADD_JOB_TIMEOUT
+func (w *InstanceEventListWatcher) Timeout() time.Duration {
+       return AddJobTimeout
 }
 
-func (w *ListWatcher) Close() {
+func (w *InstanceEventListWatcher) Close() {
        close(w.Job)
 }
 
-func NewWatchJob(group, subject string, rev int64, response 
*pb.WatchInstanceResponse) *WatchJob {
-       return &WatchJob{
-               BaseNotifyJob: &BaseNotifyJob{
-                       group:   group,
-                       subject: subject,
-                       nType:   INSTANCE,
-               },
+func NewInstanceEvent(group, subject string, rev int64, response 
*pb.WatchInstanceResponse) *InstanceEvent {
+       return &InstanceEvent{
+               Event:    notify.NewEvent(INSTANCE, subject, group),
                Revision: rev,
                Response: response,
        }
 }
 
-func NewListWatcher(group string, subject string,
-       listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) 
*ListWatcher {
-       watcher := &ListWatcher{
-               BaseSubscriber: NewSubscriber(INSTANCE, subject, group),
-               Job:            make(chan *WatchJob, DEFAULT_MAX_QUEUE),
-               ListFunc:       listFunc,
-               listCh:         make(chan struct{}),
+func NewInstanceEventListWatcher(group string, subject string,
+       listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) 
*InstanceEventListWatcher {
+       watcher := &InstanceEventListWatcher{
+               Subscriber: notify.NewSubscriber(INSTANCE, subject, group),
+               Job:        make(chan *InstanceEvent, INSTANCE.QueueSize()),
+               ListFunc:   listFunc,
+               listCh:     make(chan struct{}),
        }
        return watcher
 }
diff --git a/server/service/notification/publisher.go 
b/server/notify/publisher.go
similarity index 99%
rename from server/service/notification/publisher.go
rename to server/notify/publisher.go
index 70aa7b2..fc0d703 100644
--- a/server/service/notification/publisher.go
+++ b/server/notify/publisher.go
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notification
+package notify
 
 import (
        "github.com/apache/servicecomb-service-center/pkg/gopool"
diff --git a/server/service/notification/stream.go b/server/notify/stream.go
similarity index 68%
rename from server/service/notification/stream.go
rename to server/notify/stream.go
index 7fc663d..b358c30 100644
--- a/server/service/notification/stream.go
+++ b/server/notify/stream.go
@@ -14,23 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notification
+package notify
 
 import (
        "errors"
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/pkg/util"
+       apt "github.com/apache/servicecomb-service-center/server/core"
        pb "github.com/apache/servicecomb-service-center/server/core/proto"
+       "golang.org/x/net/context"
        "time"
 )
 
-func HandleWatchJob(watcher *ListWatcher, stream 
pb.ServiceInstanceCtrl_WatchServer) (err error) {
-       timer := time.NewTimer(DEFAULT_HEARTBEAT_INTERVAL)
+func HandleWatchJob(watcher *InstanceEventListWatcher, stream 
pb.ServiceInstanceCtrl_WatchServer) (err error) {
+       timer := time.NewTimer(HeartbeatTimeout)
        defer timer.Stop()
        for {
                select {
                case <-timer.C:
-                       timer.Reset(DEFAULT_HEARTBEAT_INTERVAL)
+                       timer.Reset(HeartbeatTimeout)
 
                        // TODO grpc 长连接心跳?
                case job := <-watcher.Job:
@@ -52,7 +54,17 @@ func HandleWatchJob(watcher *ListWatcher, stream 
pb.ServiceInstanceCtrl_WatchSer
                                return
                        }
 
-                       util.ResetTimer(timer, DEFAULT_HEARTBEAT_INTERVAL)
+                       util.ResetTimer(timer, HeartbeatTimeout)
                }
        }
 }
+
+func DoStreamListAndWatch(ctx context.Context, serviceId string, f func() 
([]*pb.WatchInstanceResponse, int64), stream 
pb.ServiceInstanceCtrl_WatchServer) error {
+       domainProject := util.ParseDomainProject(ctx)
+       watcher := NewInstanceEventListWatcher(serviceId, 
apt.GetInstanceRootKey(domainProject)+"/", f)
+       err := NotifyCenter().AddSubscriber(watcher)
+       if err != nil {
+               return err
+       }
+       return HandleWatchJob(watcher, stream)
+}
diff --git a/server/service/notification/stream_test.go 
b/server/notify/stream_test.go
similarity index 68%
rename from server/service/notification/stream_test.go
rename to server/notify/stream_test.go
index 5193045..e99ef3e 100644
--- a/server/service/notification/stream_test.go
+++ b/server/notify/stream_test.go
@@ -14,21 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notification
+package notify
 
-import "testing"
+import (
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "golang.org/x/net/context"
+       "testing"
+)
 
 func TestHandleWatchJob(t *testing.T) {
-       defer func() { recover() }()
-       w := NewListWatcher("g", "s", nil)
+       defer log.Recover()
+       w := NewInstanceEventListWatcher("g", "s", nil)
        w.Job <- nil
        err := HandleWatchJob(w, nil)
        if err == nil {
                t.Fatalf("TestHandleWatchJob failed")
        }
-       w.Job <- NewWatchJob("g", "s", 1, nil)
+       w.Job <- NewInstanceEvent("g", "s", 1, nil)
        err = HandleWatchJob(w, nil)
-       if err != nil {
-               t.Fatalf("TestHandleWatchJob failed")
+       t.Fatalf("TestHandleWatchJob failed")
+}
+
+func TestDoStreamListAndWatch(t *testing.T) {
+       err := DoStreamListAndWatch(context.Background(), "s", nil, nil)
+       if err == nil {
+               t.Fatal("TestDoStreamListAndWatch failed", err)
        }
 }
diff --git a/server/service/notification/websocket.go 
b/server/notify/websocket.go
similarity index 95%
rename from server/service/notification/websocket.go
rename to server/notify/websocket.go
index 07db6f6..64f6203 100644
--- a/server/service/notification/websocket.go
+++ b/server/notify/websocket.go
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notification
+package notify
 
 import (
        "encoding/json"
@@ -34,14 +34,14 @@ type WebSocket struct {
        ticker *time.Ticker
        conn   *websocket.Conn
        // watcher subscribe the notification service event
-       watcher         *ListWatcher
+       watcher         *InstanceEventListWatcher
        needPingWatcher bool
        free            chan struct{}
        closed          chan struct{}
 }
 
 func (wh *WebSocket) Init() error {
-       wh.ticker = time.NewTicker(DEFAULT_HEARTBEAT_INTERVAL)
+       wh.ticker = time.NewTicker(HeartbeatTimeout)
        wh.needPingWatcher = true
        wh.free = make(chan struct{}, 1)
        wh.closed = make(chan struct{})
@@ -51,7 +51,7 @@ func (wh *WebSocket) Init() error {
        remoteAddr := wh.conn.RemoteAddr().String()
 
        // put in notification service queue
-       if err := GetNotifyService().AddSubscriber(wh.watcher); err != nil {
+       if err := NotifyCenter().AddSubscriber(wh.watcher); err != nil {
                err = fmt.Errorf("establish[%s] websocket watch failed: notify 
service error, %s",
                        remoteAddr, err.Error())
                log.Errorf(nil, err.Error())
@@ -72,7 +72,7 @@ func (wh *WebSocket) Init() error {
 }
 
 func (wh *WebSocket) Timeout() time.Duration {
-       return DEFAULT_SEND_TIMEOUT
+       return SendTimeout
 }
 
 func (wh *WebSocket) heartbeat(messageType int) error {
@@ -194,8 +194,8 @@ func (wh *WebSocket) HandleWatchWebSocketJob(o interface{}) 
{
                        remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
                wh.heartbeat(websocket.PingMessage)
                return
-       case *WatchJob:
-               job := o.(*WatchJob)
+       case *InstanceEvent:
+               job := o.(*InstanceEvent)
                resp := job.Response
 
                providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, 
resp.Key.ServiceName, resp.Key.Version)
@@ -253,7 +253,7 @@ func DoWebSocketListAndWatch(ctx context.Context, serviceId 
string, f func() ([]
        socket := &WebSocket{
                ctx:     ctx,
                conn:    conn,
-               watcher: NewListWatcher(serviceId, 
apt.GetInstanceRootKey(domainProject)+"/", f),
+               watcher: NewInstanceEventListWatcher(serviceId, 
apt.GetInstanceRootKey(domainProject)+"/", f),
        }
        process(socket)
 }
diff --git a/server/service/notification/websocket_test.go 
b/server/notify/websocket_test.go
similarity index 90%
rename from server/service/notification/websocket_test.go
rename to server/notify/websocket_test.go
index 403a2e3..a8b61e5 100644
--- a/server/service/notification/websocket_test.go
+++ b/server/notify/websocket_test.go
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package notification
+package notify
 
 import _ "github.com/apache/servicecomb-service-center/server/init"
 import (
@@ -56,31 +56,12 @@ func (h *watcherConn) ServeHTTP(w http.ResponseWriter, r 
*http.Request) {
 func TestDoWebSocketListAndWatch(t *testing.T) {
        s := httptest.NewServer(&watcherConn{})
 
-       GetNotifyService().Start()
-
        conn, _, _ := websocket.DefaultDialer.Dial(
                strings.Replace(s.URL, "http://";, "ws://", 1), nil)
 
-       go func() {
-               DoWebSocketListAndWatch(context.Background(), "", nil, conn)
-
-               w2 := NewListWatcher("g", "s", func() (results 
[]*proto.WatchInstanceResponse, rev int64) {
-                       return
-               })
-               ws2 := &WebSocket{
-                       ctx:     context.Background(),
-                       conn:    conn,
-                       watcher: w2,
-               }
-               err := ws2.Init()
-               if err != nil {
-                       t.Fatalf("TestPublisher_Run")
-               }
-       }()
-
        EstablishWebSocketError(conn, errors.New("error"))
 
-       w := NewListWatcher("g", "s", func() (results 
[]*proto.WatchInstanceResponse, rev int64) {
+       w := NewInstanceEventListWatcher("g", "s", func() (results 
[]*proto.WatchInstanceResponse, rev int64) {
                results = append(results, &proto.WatchInstanceResponse{
                        Response: proto.CreateResponse(proto.Response_SUCCESS, 
"ok"),
                        Action:   string(proto.EVT_CREATE),
@@ -89,7 +70,6 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
                })
                return
        })
-       w.nType = -1
 
        ws := &WebSocket{
                ctx:     context.Background(),
@@ -101,7 +81,25 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
                t.Fatalf("TestPublisher_Run")
        }
 
-       w.nType = INSTANCE
+       NotifyCenter().Start()
+
+       go func() {
+               DoWebSocketListAndWatch(context.Background(), "", nil, conn)
+
+               w2 := NewInstanceEventListWatcher("g", "s", func() (results 
[]*proto.WatchInstanceResponse, rev int64) {
+                       return
+               })
+               ws2 := &WebSocket{
+                       ctx:     context.Background(),
+                       conn:    conn,
+                       watcher: w2,
+               }
+               err := ws2.Init()
+               if err != nil {
+                       t.Fatalf("TestPublisher_Run")
+               }
+       }()
+
        err = ws.Init()
        if err != nil {
                t.Fatalf("TestPublisher_Run")
@@ -109,9 +107,9 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
        go ws.HandleWatchWebSocketControlMessage()
 
        w.OnMessage(nil)
-       w.OnMessage(&WatchJob{})
+       w.OnMessage(&InstanceEvent{})
 
-       GetNotifyService().AddJob(NewWatchJob("g", "s", 1, 
&proto.WatchInstanceResponse{
+       NotifyCenter().Publish(NewInstanceEvent("g", "s", 1, 
&proto.WatchInstanceResponse{
                Response: proto.CreateResponse(proto.Response_SUCCESS, "ok"),
                Action:   string(proto.EVT_CREATE),
                Key:      &proto.MicroServiceKey{},
diff --git a/server/server.go b/server/server.go
index 5d28e74..f7af7b7 100644
--- a/server/server.go
+++ b/server/server.go
@@ -21,11 +21,12 @@ import (
        "fmt"
        "github.com/apache/servicecomb-service-center/pkg/gopool"
        "github.com/apache/servicecomb-service-center/pkg/log"
+       nf "github.com/apache/servicecomb-service-center/pkg/notify"
        "github.com/apache/servicecomb-service-center/server/core"
        "github.com/apache/servicecomb-service-center/server/core/backend"
        "github.com/apache/servicecomb-service-center/server/mux"
+       "github.com/apache/servicecomb-service-center/server/notify"
        "github.com/apache/servicecomb-service-center/server/plugin"
-       nf 
"github.com/apache/servicecomb-service-center/server/service/notification"
        serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
        "github.com/apache/servicecomb-service-center/version"
        "github.com/astaxie/beego"
@@ -132,7 +133,7 @@ func (s *ServiceCenterServer) compactBackendService() {
 
 func (s *ServiceCenterServer) initialize() {
        s.store = backend.Store()
-       s.notifyService = nf.GetNotifyService()
+       s.notifyService = notify.NotifyCenter()
        s.apiServer = GetAPIServer()
        s.goroutine = gopool.New(context.Background())
 
diff --git a/server/service/event/instance_event_handler.go 
b/server/service/event/instance_event_handler.go
index d526d74..d1c81ee 100644
--- a/server/service/event/instance_event_handler.go
+++ b/server/service/event/instance_event_handler.go
@@ -22,10 +22,10 @@ import (
        apt "github.com/apache/servicecomb-service-center/server/core"
        "github.com/apache/servicecomb-service-center/server/core/backend"
        pb "github.com/apache/servicecomb-service-center/server/core/proto"
+       "github.com/apache/servicecomb-service-center/server/notify"
        
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
        "github.com/apache/servicecomb-service-center/server/service/cache"
        "github.com/apache/servicecomb-service-center/server/service/metrics"
-       nf 
"github.com/apache/servicecomb-service-center/server/service/notification"
        serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
        "golang.org/x/net/context"
        "strings"
@@ -59,7 +59,7 @@ func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) 
{
                }
        }
 
-       if nf.GetNotifyService().Closed() {
+       if notify.NotifyCenter().Closed() {
                log.Warnf("caught [%s] instance[%s/%s] event, but notify 
service is closed",
                        action, providerId, providerInstanceId)
                return
@@ -110,7 +110,7 @@ func PublishInstanceEvent(domainProject string, action 
pb.EventType, serviceKey
        }
        for _, consumerId := range subscribers {
                // TODO add超时怎么处理?
-               job := nf.NewWatchJob(consumerId, 
apt.GetInstanceRootKey(domainProject)+"/", rev, response)
-               nf.GetNotifyService().AddJob(job)
+               job := notify.NewInstanceEvent(consumerId, 
apt.GetInstanceRootKey(domainProject)+"/", rev, response)
+               notify.NotifyCenter().Publish(job)
        }
 }
diff --git a/server/service/event/rule_event_handler.go 
b/server/service/event/rule_event_handler.go
index 095dd0e..58051d9 100644
--- a/server/service/event/rule_event_handler.go
+++ b/server/service/event/rule_event_handler.go
@@ -23,8 +23,8 @@ import (
        "github.com/apache/servicecomb-service-center/server/core"
        "github.com/apache/servicecomb-service-center/server/core/backend"
        pb "github.com/apache/servicecomb-service-center/server/core/proto"
+       "github.com/apache/servicecomb-service-center/server/notify"
        
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
-       nf 
"github.com/apache/servicecomb-service-center/server/service/notification"
        serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
        "golang.org/x/net/context"
 )
@@ -92,7 +92,7 @@ func (h *RuleEventHandler) OnEvent(evt discovery.KvEvent) {
        }
 
        providerId, ruleId, domainProject := core.GetInfoFromRuleKV(evt.KV.Key)
-       if nf.GetNotifyService().Closed() {
+       if notify.NotifyCenter().Closed() {
                log.Warnf("caught [%s] service rule[%s/%s] event, but notify 
service is closed",
                        action, providerId, ruleId)
                return
diff --git a/server/service/event/tag_event_handler.go 
b/server/service/event/tag_event_handler.go
index fb7ea03..7059e8f 100644
--- a/server/service/event/tag_event_handler.go
+++ b/server/service/event/tag_event_handler.go
@@ -23,9 +23,9 @@ import (
        "github.com/apache/servicecomb-service-center/server/core"
        "github.com/apache/servicecomb-service-center/server/core/backend"
        pb "github.com/apache/servicecomb-service-center/server/core/proto"
+       "github.com/apache/servicecomb-service-center/server/notify"
        
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
        "github.com/apache/servicecomb-service-center/server/service/cache"
-       nf 
"github.com/apache/servicecomb-service-center/server/service/notification"
        serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
        "golang.org/x/net/context"
 )
@@ -106,7 +106,7 @@ func (h *TagEventHandler) OnEvent(evt discovery.KvEvent) {
 
        consumerId, domainProject := core.GetInfoFromTagKV(evt.KV.Key)
 
-       if nf.GetNotifyService().Closed() {
+       if notify.NotifyCenter().Closed() {
                log.Warnf("caught [%s] service tags[%s/%s] event, but notify 
service is closed",
                        action, consumerId, evt.KV.Value)
                return
diff --git a/server/service/watch.go b/server/service/watch.go
index 4e1a629..8b4c039 100644
--- a/server/service/watch.go
+++ b/server/service/watch.go
@@ -20,9 +20,8 @@ import (
        "errors"
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/pkg/util"
-       apt "github.com/apache/servicecomb-service-center/server/core"
        pb "github.com/apache/servicecomb-service-center/server/core/proto"
-       nf 
"github.com/apache/servicecomb-service-center/server/service/notification"
+       "github.com/apache/servicecomb-service-center/server/notify"
        serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
        "github.com/gorilla/websocket"
        "golang.org/x/net/context"
@@ -40,34 +39,31 @@ func (s *InstanceService) WatchPreOpera(ctx 
context.Context, in *pb.WatchInstanc
 }
 
 func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, stream 
pb.ServiceInstanceCtrl_WatchServer) error {
-       var err error
-       if err = s.WatchPreOpera(stream.Context(), in); err != nil {
+       log.Infof("new a stream list and watch with service[%s]", 
in.SelfServiceId)
+       if err := s.WatchPreOpera(stream.Context(), in); err != nil {
                log.Errorf(err, "service[%s] establish watch failed: invalid 
params", in.SelfServiceId)
                return err
        }
-       domainProject := util.ParseDomainProject(stream.Context())
-       watcher := nf.NewListWatcher(in.SelfServiceId, 
apt.GetInstanceRootKey(domainProject)+"/", nil)
-       err = nf.GetNotifyService().AddSubscriber(watcher)
-       log.Infof("watcher[%s/%s] start watch instance status", 
watcher.Subject(), watcher.Group())
-       return nf.HandleWatchJob(watcher, stream)
+
+       return notify.DoStreamListAndWatch(stream.Context(), in.SelfServiceId, 
nil, stream)
 }
 
 func (s *InstanceService) WebSocketWatch(ctx context.Context, in 
*pb.WatchInstanceRequest, conn *websocket.Conn) {
        log.Infof("new a web socket watch with service[%s]", in.SelfServiceId)
        if err := s.WatchPreOpera(ctx, in); err != nil {
-               nf.EstablishWebSocketError(conn, err)
+               notify.EstablishWebSocketError(conn, err)
                return
        }
-       nf.DoWebSocketListAndWatch(ctx, in.SelfServiceId, nil, conn)
+       notify.DoWebSocketListAndWatch(ctx, in.SelfServiceId, nil, conn)
 }
 
 func (s *InstanceService) WebSocketListAndWatch(ctx context.Context, in 
*pb.WatchInstanceRequest, conn *websocket.Conn) {
        log.Infof("new a web socket list and watch with service[%s]", 
in.SelfServiceId)
        if err := s.WatchPreOpera(ctx, in); err != nil {
-               nf.EstablishWebSocketError(conn, err)
+               notify.EstablishWebSocketError(conn, err)
                return
        }
-       nf.DoWebSocketListAndWatch(ctx, in.SelfServiceId, func() 
([]*pb.WatchInstanceResponse, int64) {
+       notify.DoWebSocketListAndWatch(ctx, in.SelfServiceId, func() 
([]*pb.WatchInstanceResponse, int64) {
                return serviceUtil.QueryAllProvidersInstances(ctx, 
in.SelfServiceId)
        }, conn)
 }

Reply via email to