This is an automated email from the ASF dual-hosted git repository. littlecui pushed a commit to branch alarm in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
commit 2e6388701f0476d2cb2206ebfe81ed8dd93b463c Author: little-cui <[email protected]> AuthorDate: Thu Nov 29 18:39:31 2018 +0800 SCB-1049 Restructure --- .../service/notification => pkg/notify}/common.go | 44 +------------- .../service/notification => pkg/notify}/group.go | 4 +- .../notification => pkg/notify}/group_test.go | 15 ++--- .../service/notification => pkg/notify}/notice.go | 26 ++++---- .../notify}/notification_healthchecker.go | 24 +++----- .../notify}/notification_service.go | 70 ++++++++++------------ .../notify}/notification_test.go | 31 ++++------ .../notification => pkg/notify}/processor.go | 49 +++++++-------- .../notification => pkg/notify}/processor_test.go | 29 +++++---- .../service/notification => pkg/notify}/subject.go | 4 +- .../notification => pkg/notify}/subject_test.go | 9 +-- .../notification => pkg/notify}/subscriber.go | 36 +++++------ pkg/notify/types.go | 59 ++++++++++++++++++ pkg/notify/types_test.go | 36 +++++++++++ server/alarm/alarm.go | 5 +- server/notify/common.go | 39 ++++++++++++ .../notification => notify}/listwatcher.go | 61 +++++++++---------- .../{service/notification => notify}/publisher.go | 2 +- server/{service/notification => notify}/stream.go | 22 +++++-- .../notification => notify}/stream_test.go | 23 ++++--- .../{service/notification => notify}/websocket.go | 16 ++--- .../notification => notify}/websocket_test.go | 48 +++++++-------- server/server.go | 5 +- server/service/event/instance_event_handler.go | 8 +-- server/service/event/rule_event_handler.go | 4 +- server/service/event/tag_event_handler.go | 4 +- server/service/watch.go | 22 +++---- 27 files changed, 389 insertions(+), 306 deletions(-) diff --git a/server/service/notification/common.go b/pkg/notify/common.go similarity index 52% rename from server/service/notification/common.go rename to pkg/notify/common.go index 1617f6f..5217aab 100644 --- a/server/service/notification/common.go +++ b/pkg/notify/common.go @@ -14,50 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package notification - -import ( - "strconv" - "time" -) +package notify const ( - DEFAULT_MAX_QUEUE = 1000 - DEFAULT_ADD_JOB_TIMEOUT = 1 * time.Second - DEFAULT_SEND_TIMEOUT = 5 * time.Second - DEFAULT_HEARTBEAT_INTERVAL = 30 * time.Second + DefaultQueueSize = 1000 ) const ( - NOTIFTY NotifyType = iota - INSTANCE - typeEnd + NOTIFTY Type = iota ) - -type NotifyType int - -func (nt NotifyType) String() string { - if int(nt) < len(notifyTypeNames) { - return notifyTypeNames[nt] - } - return "NotifyType" + strconv.Itoa(int(nt)) -} - -func (nt NotifyType) QueueSize() (s int) { - if int(nt) < len(notifyTypeQueues) { - s = notifyTypeQueues[nt] - } - if s <= 0 { - s = DEFAULT_MAX_QUEUE - } - return -} - -var notifyTypeNames = []string{ - NOTIFTY: "NOTIFTY", - INSTANCE: "INSTANCE", -} - -var notifyTypeQueues = []int{ - INSTANCE: 100 * 1000, -} diff --git a/server/service/notification/group.go b/pkg/notify/group.go similarity index 96% rename from server/service/notification/group.go rename to pkg/notify/group.go index 8c677d4..58f39a0 100644 --- a/server/service/notification/group.go +++ b/pkg/notify/group.go @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package notification +package notify import ( "github.com/apache/servicecomb-service-center/pkg/util" @@ -29,7 +29,7 @@ func (g *Group) Name() string { return g.name } -func (g *Group) Notify(job NotifyJob) { +func (g *Group) Notify(job Event) { g.subscribers.ForEach(func(item util.MapItem) (next bool) { item.Value.(Subscriber).OnMessage(job) return true diff --git a/server/service/notification/group_test.go b/pkg/notify/group_test.go similarity index 86% rename from server/service/notification/group_test.go rename to pkg/notify/group_test.go index b241eb4..44b185d 100644 --- a/server/service/notification/group_test.go +++ b/pkg/notify/group_test.go @@ -14,20 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package notification +package notify import "testing" type mockSubscriber struct { - *BaseSubscriber - job NotifyJob + Subscriber + job Event } -func (s *mockSubscriber) OnMessage(job NotifyJob) { +func (s *mockSubscriber) OnMessage(job Event) { s.job = job } func TestGroup_Add(t *testing.T) { + INSTANCE := RegisterType("INSTANCE", 1) m := NewSubscriber(INSTANCE, "s1", "g1") g := NewGroup("g1") if g.Name() != "g1" { @@ -39,7 +40,7 @@ func TestGroup_Add(t *testing.T) { if g.AddSubscriber(NewSubscriber(INSTANCE, "s1", "g1")) == m { t.Fatalf("TestGroup_Add failed") } - same := *m + same := *(m.(*baseSubscriber)) if g.AddSubscriber(&same) != m { t.Fatalf("TestGroup_Add failed") } @@ -54,14 +55,14 @@ func TestGroup_Add(t *testing.T) { t.Fatalf("TestGroup_Add failed") } - mock := &mockSubscriber{BaseSubscriber: NewSubscriber(INSTANCE, "s1", "g1")} + mock := &mockSubscriber{Subscriber: NewSubscriber(INSTANCE, "s1", "g1")} if g.AddSubscriber(mock) != mock { t.Fatalf("TestGroup_Add failed") } if g.Subscribers(mock.Id()) != mock { t.Fatalf("TestGroup_Add failed") } - job := &BaseNotifyJob{nType: INSTANCE} + job := &baseEvent{nType: INSTANCE} g.Notify(job) if mock.job != job { t.Fatalf("TestGroup_Add failed") diff --git a/server/service/notification/notice.go b/pkg/notify/notice.go similarity index 69% rename from server/service/notification/notice.go rename to pkg/notify/notice.go index 26d5235..836d47b 100644 --- a/server/service/notification/notice.go +++ b/pkg/notify/notice.go @@ -14,28 +14,32 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package notification +package notify -type NotifyJob interface { - Type() NotifyType - Group() string - Subject() string +type Event interface { + Type() Type + Subject() string // required! + Group() string // broadcast all the subscriber of the same subject if group is empty } -type BaseNotifyJob struct { - nType NotifyType +type baseEvent struct { + nType Type subject string group string } -func (s *BaseNotifyJob) Type() NotifyType { +func (s *baseEvent) Type() Type { return s.nType } -func (s *BaseNotifyJob) Group() string { +func (s *baseEvent) Subject() string { + return s.subject +} + +func (s *baseEvent) Group() string { return s.group } -func (s *BaseNotifyJob) Subject() string { - return s.subject +func NewEvent(t Type, s string, g string) Event { + return &baseEvent{t, s, g} } diff --git a/server/service/notification/notification_healthchecker.go b/pkg/notify/notification_healthchecker.go similarity index 76% rename from server/service/notification/notification_healthchecker.go rename to pkg/notify/notification_healthchecker.go index e529314..8cc41cd 100644 --- a/server/service/notification/notification_healthchecker.go +++ b/pkg/notify/notification_healthchecker.go @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package notification +package notify import "github.com/apache/servicecomb-service-center/pkg/log" @@ -25,15 +25,15 @@ const ( //Notifier 健康检查 type NotifyServiceHealthChecker struct { - BaseSubscriber + Subscriber } type NotifyServiceHealthCheckJob struct { - *BaseNotifyJob + Event ErrorSubscriber Subscriber } -func (s *NotifyServiceHealthChecker) OnMessage(job NotifyJob) { +func (s *NotifyServiceHealthChecker) OnMessage(job Event) { j := job.(*NotifyServiceHealthCheckJob) err := j.ErrorSubscriber.Err() @@ -43,28 +43,20 @@ func (s *NotifyServiceHealthChecker) OnMessage(job NotifyJob) { return } - log.Debugf("notification service remove %s watcher, error: %s, subject: %s, group: %s", - j.ErrorSubscriber.Type(), err.Error(), j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Group()) + log.Debugf("notification service remove %s watcher, error: %v, subject: %s, group: %s", + j.ErrorSubscriber.Type(), err, j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Group()) s.Service().RemoveSubscriber(j.ErrorSubscriber) } func NewNotifyServiceHealthChecker() *NotifyServiceHealthChecker { return &NotifyServiceHealthChecker{ - BaseSubscriber: BaseSubscriber{ - group: NOTIFY_SERVER_CHECKER_NAME, - subject: NOTIFY_SERVER_CHECK_SUBJECT, - nType: NOTIFTY, - }, + Subscriber: NewSubscriber(NOTIFTY, NOTIFY_SERVER_CHECK_SUBJECT, NOTIFY_SERVER_CHECKER_NAME), } } func NewNotifyServiceHealthCheckJob(s Subscriber) *NotifyServiceHealthCheckJob { return &NotifyServiceHealthCheckJob{ - BaseNotifyJob: &BaseNotifyJob{ - group: NOTIFY_SERVER_CHECKER_NAME, - subject: NOTIFY_SERVER_CHECK_SUBJECT, - nType: NOTIFTY, - }, + Event: NewEvent(NOTIFTY, NOTIFY_SERVER_CHECK_SUBJECT, NOTIFY_SERVER_CHECKER_NAME), ErrorSubscriber: s, } } diff --git a/server/service/notification/notification_service.go b/pkg/notify/notification_service.go similarity index 66% rename from server/service/notification/notification_service.go rename to pkg/notify/notification_service.go index 9c81401..3733004 100644 --- a/server/service/notification/notification_service.go +++ b/pkg/notify/notification_service.go @@ -14,29 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package notification +package notify import ( "errors" - "github.com/apache/servicecomb-service-center/pkg/gopool" "github.com/apache/servicecomb-service-center/pkg/log" - "github.com/apache/servicecomb-service-center/pkg/util" - "golang.org/x/net/context" "sync" ) -var notifyService *NotifyService - -func init() { - notifyService = &NotifyService{ - isClose: true, - goroutine: gopool.New(context.Background()), - } -} - type NotifyService struct { - processors *util.ConcurrentMap - goroutine *gopool.Pool + processors map[Type]*Processor err chan error closeMux sync.RWMutex isClose bool @@ -47,10 +34,9 @@ func (s *NotifyService) Err() <-chan error { } func (s *NotifyService) init() { - s.processors = util.NewConcurrentMap(int(typeEnd)) s.err = make(chan error, 1) - for i := NotifyType(0); i != typeEnd; i++ { - s.processors.Put(i, NewProcessor(i.String(), i.QueueSize())) + for _, t := range Types() { + s.processors[t] = NewProcessor(t.String(), t.QueueSize()) } } @@ -67,12 +53,9 @@ func (s *NotifyService) Start() { // 错误subscriber清理 s.AddSubscriber(NewNotifyServiceHealthChecker()) - log.Debugf("notify service is started") + s.startProcessors() - s.processors.ForEach(func(item util.MapItem) (next bool) { - s.goroutine.Do(item.Value.(*Processor).Do) - return true - }) + log.Debugf("notify service is started") } func (s *NotifyService) AddSubscriber(n Subscriber) error { @@ -80,45 +63,51 @@ func (s *NotifyService) AddSubscriber(n Subscriber) error { return errors.New("server is shutting down") } - itf, ok := s.processors.Get(n.Type()) + p, ok := s.processors[n.Type()] if !ok { return errors.New("Unknown subscribe type") } n.SetService(s) n.OnAccept() - itf.(*Processor).AddSubscriber(n) + p.AddSubscriber(n) return nil } func (s *NotifyService) RemoveSubscriber(n Subscriber) { - itf, ok := s.processors.Get(n.Type()) + p, ok := s.processors[n.Type()] if !ok { return } - itf.(*Processor).Remove(n) + p.Remove(n) n.Close() } -func (s *NotifyService) RemoveAllSubscribers() { - s.processors.ForEach(func(item util.MapItem) (next bool) { - item.Value.(*Processor).Clear() - return true - }) +func (s *NotifyService) startProcessors() { + for _, p := range s.processors { + p.Run() + } +} + +func (s *NotifyService) stopProcessors() { + for _, p := range s.processors { + p.Clear() + p.Stop() + } } //通知内容塞到队列里 -func (s *NotifyService) AddJob(job NotifyJob) error { +func (s *NotifyService) Publish(job Event) error { if s.Closed() { return errors.New("add notify job failed for server shutdown") } - itf, ok := s.processors.Get(job.Type()) + p, ok := s.processors[job.Type()] if !ok { return errors.New("Unknown job type") } - itf.(*Processor).Accept(job) + p.Accept(job) return nil } @@ -137,15 +126,16 @@ func (s *NotifyService) Stop() { s.isClose = true s.closeMux.Unlock() - s.goroutine.Close(true) - - s.RemoveAllSubscribers() + s.stopProcessors() close(s.err) log.Debug("notify service stopped") } -func GetNotifyService() *NotifyService { - return notifyService +func NewNotifyService() *NotifyService { + return &NotifyService{ + processors: make(map[Type]*Processor), + isClose: true, + } } diff --git a/server/service/notification/notification_test.go b/pkg/notify/notification_test.go similarity index 76% rename from server/service/notification/notification_test.go rename to pkg/notify/notification_test.go index 9a05f29..7e3360b 100644 --- a/server/service/notification/notification_test.go +++ b/pkg/notify/notification_test.go @@ -14,25 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package notification +package notify import ( - "github.com/apache/servicecomb-service-center/pkg/gopool" - "golang.org/x/net/context" "testing" "time" ) func TestGetNotifyService(t *testing.T) { - n := NotifyType(999) - if n.String() != "NotifyType999" { - t.Fatalf("TestGetNotifyService failed") - } - - notifyService := &NotifyService{ - isClose: true, - goroutine: gopool.New(context.Background()), - } + notifyService := NewNotifyService() if notifyService == nil { t.Fatalf("TestGetNotifyService failed") } @@ -44,11 +34,12 @@ func TestGetNotifyService(t *testing.T) { if err == nil { t.Fatalf("TestGetNotifyService failed") } - err = notifyService.AddJob(nil) + err = notifyService.Publish(nil) if err == nil { t.Fatalf("TestGetNotifyService failed") } + INSTANCE := RegisterType("INSTANCE", 1) notifyService.Start() notifyService.Start() if notifyService.Closed() != false { @@ -65,25 +56,27 @@ func TestGetNotifyService(t *testing.T) { if err == nil { t.Fatalf("TestGetNotifyService failed") } + notifyService.RemoveSubscriber(s) + s = NewSubscriber(INSTANCE, "s", "g") err = notifyService.AddSubscriber(s) if err != nil { - t.Fatalf("TestGetNotifyService failed") + t.Fatalf("TestGetNotifyService failed, %v", err) } - j := &BaseNotifyJob{INSTANCE, "s", "g"} - err = notifyService.AddJob(j) + j := &baseEvent{INSTANCE, "s", "g"} + err = notifyService.Publish(j) if err != nil { t.Fatalf("TestGetNotifyService failed") } - err = notifyService.AddJob(NewNotifyServiceHealthCheckJob(NewNotifyServiceHealthChecker())) + err = notifyService.Publish(NewNotifyServiceHealthCheckJob(NewNotifyServiceHealthChecker())) if err != nil { t.Fatalf("TestGetNotifyService failed") } - <-time.After(time.Second) - err = notifyService.AddJob(NewNotifyServiceHealthCheckJob(s)) + err = notifyService.Publish(NewNotifyServiceHealthCheckJob(s)) if err != nil { t.Fatalf("TestGetNotifyService failed") } + <-time.After(time.Second) notifyService.Stop() if notifyService.Closed() != true { t.Fatalf("TestGetNotifyService failed") diff --git a/server/service/notification/processor.go b/pkg/notify/processor.go similarity index 76% rename from server/service/notification/processor.go rename to pkg/notify/processor.go index 17528a8..9df5288 100644 --- a/server/service/notification/processor.go +++ b/pkg/notify/processor.go @@ -14,25 +14,35 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package notification +package notify import ( - "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/pkg/queue" "github.com/apache/servicecomb-service-center/pkg/util" "golang.org/x/net/context" ) type Processor struct { + *queue.TaskQueue + name string subjects *util.ConcurrentMap - queue chan NotifyJob + queue chan Event } func (p *Processor) Name() string { return p.name } -func (p *Processor) Notify(job NotifyJob) { +func (p *Processor) Accept(job Event) { + p.Add(queue.Task{Object: job}) +} + +func (p *Processor) Handle(ctx context.Context, obj interface{}) { + p.Notify(obj.(Event)) +} + +func (p *Processor) Notify(job Event) { if itf, ok := p.subjects.Get(job.Subject()); ok { itf.(*Subject).Notify(job) } @@ -79,29 +89,12 @@ func (p *Processor) Clear() { p.subjects.Clear() } -func (p *Processor) Accept(job NotifyJob) { - defer log.Recover() - p.queue <- job -} - -func (p *Processor) Do(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case job, ok := <-p.queue: - if !ok { - return - } - p.Notify(job) - } - } -} - -func NewProcessor(name string, queue int) *Processor { - return &Processor{ - name: name, - subjects: util.NewConcurrentMap(0), - queue: make(chan NotifyJob, queue), +func NewProcessor(name string, queueSize int) *Processor { + p := &Processor{ + TaskQueue: queue.NewTaskQueue(queueSize), + name: name, + subjects: util.NewConcurrentMap(0), } + p.AddWorker(p) + return p } diff --git a/server/service/notification/processor_test.go b/pkg/notify/processor_test.go similarity index 82% rename from server/service/notification/processor_test.go rename to pkg/notify/processor_test.go index 7b6b748..3505457 100644 --- a/server/service/notification/processor_test.go +++ b/pkg/notify/processor_test.go @@ -14,31 +14,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package notification +package notify import ( - "github.com/apache/servicecomb-service-center/pkg/gopool" "testing" "time" ) type mockSubscriberChan struct { - *BaseSubscriber - job chan NotifyJob + Subscriber + job chan Event } -func (s *mockSubscriberChan) OnMessage(job NotifyJob) { +func (s *mockSubscriberChan) OnMessage(job Event) { s.job <- job } func TestProcessor_Do(t *testing.T) { + INSTANCE := RegisterType("INSTANCE", 1) delay := 50 * time.Millisecond - mock1 := &mockSubscriberChan{BaseSubscriber: NewSubscriber(INSTANCE, "s1", "g1"), - job: make(chan NotifyJob, 1)} - mock2 := &mockSubscriberChan{BaseSubscriber: NewSubscriber(INSTANCE, "s1", "g2"), - job: make(chan NotifyJob, 1)} + mock1 := &mockSubscriberChan{Subscriber: NewSubscriber(INSTANCE, "s1", "g1"), + job: make(chan Event, 1)} + mock2 := &mockSubscriberChan{Subscriber: NewSubscriber(INSTANCE, "s1", "g2"), + job: make(chan Event, 1)} p := NewProcessor("p1", 0) - gopool.Go(p.Do) if p.Name() != "p1" { t.Fatalf("TestProcessor_Do") } @@ -62,8 +61,8 @@ func TestProcessor_Do(t *testing.T) { } p.AddSubscriber(mock1) p.AddSubscriber(mock2) - job := &BaseNotifyJob{group: "g1"} - p.Accept(job) + job := &baseEvent{group: "g1"} + p.Handle(nil, job) select { case <-mock1.job: t.Fatalf("TestProcessor_Do") @@ -71,7 +70,7 @@ func TestProcessor_Do(t *testing.T) { } job.subject = "s1" job.group = "g3" - p.Accept(job) + p.Handle(nil, job) select { case <-mock1.job: t.Fatalf("TestProcessor_Do") @@ -79,7 +78,7 @@ func TestProcessor_Do(t *testing.T) { } job.subject = "s1" job.group = "g1" - p.Accept(job) + p.Handle(nil, job) select { case j := <-mock1.job: if j != job { @@ -95,7 +94,7 @@ func TestProcessor_Do(t *testing.T) { } job.subject = "s1" job.group = "" - p.Accept(job) + p.Handle(nil, job) select { case j := <-mock1.job: if j != job { diff --git a/server/service/notification/subject.go b/pkg/notify/subject.go similarity index 96% rename from server/service/notification/subject.go rename to pkg/notify/subject.go index cdc98f6..f052801 100644 --- a/server/service/notification/subject.go +++ b/pkg/notify/subject.go @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package notification +package notify import ( "github.com/apache/servicecomb-service-center/pkg/util" @@ -29,7 +29,7 @@ func (s *Subject) Name() string { return s.name } -func (s *Subject) Notify(job NotifyJob) { +func (s *Subject) Notify(job Event) { if len(job.Group()) == 0 { s.groups.ForEach(func(item util.MapItem) (next bool) { item.Value.(*Group).Notify(job) diff --git a/server/service/notification/subject_test.go b/pkg/notify/subject_test.go similarity index 88% rename from server/service/notification/subject_test.go rename to pkg/notify/subject_test.go index 293c500..c9e955c 100644 --- a/server/service/notification/subject_test.go +++ b/pkg/notify/subject_test.go @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package notification +package notify import "testing" @@ -44,10 +44,11 @@ func TestSubject_Fetch(t *testing.T) { if s.Size() != 1 { t.Fatalf("TestSubject_Fetch failed") } - mock1 := &mockSubscriber{BaseSubscriber: NewSubscriber(INSTANCE, "s1", "g1")} - mock2 := &mockSubscriber{BaseSubscriber: NewSubscriber(INSTANCE, "s1", "g2")} + INSTANCE := RegisterType("INSTANCE", 1) + mock1 := &mockSubscriber{Subscriber: NewSubscriber(INSTANCE, "s1", "g1")} + mock2 := &mockSubscriber{Subscriber: NewSubscriber(INSTANCE, "s1", "g2")} g.AddSubscriber(mock1) - job := &BaseNotifyJob{group: "g3"} + job := &baseEvent{group: "g3"} s.Notify(job) if mock1.job != nil || mock2.job != nil { t.Fatalf("TestSubject_Fetch failed") diff --git a/server/service/notification/subscriber.go b/pkg/notify/subscriber.go similarity index 62% rename from server/service/notification/subscriber.go rename to pkg/notify/subscriber.go index 24ec5db..7544919 100644 --- a/server/service/notification/subscriber.go +++ b/pkg/notify/subscriber.go @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package notification +package notify import ( "errors" @@ -25,7 +25,7 @@ type Subscriber interface { Id() string Subject() string Group() string - Type() NotifyType + Type() Type Service() *NotifyService SetService(*NotifyService) @@ -35,34 +35,34 @@ type Subscriber interface { Close() OnAccept() // The event bus will callback this function, so it must be non-blocked. - OnMessage(job NotifyJob) + OnMessage(job Event) } -type BaseSubscriber struct { +type baseSubscriber struct { + nType Type id string subject string group string - nType NotifyType service *NotifyService err error } -func (s *BaseSubscriber) Id() string { return s.id } -func (s *BaseSubscriber) Subject() string { return s.subject } -func (s *BaseSubscriber) Group() string { return s.group } -func (s *BaseSubscriber) Type() NotifyType { return s.nType } -func (s *BaseSubscriber) Service() *NotifyService { return s.service } -func (s *BaseSubscriber) SetService(svc *NotifyService) { s.service = svc } -func (s *BaseSubscriber) Err() error { return s.err } -func (s *BaseSubscriber) SetError(err error) { s.err = err } -func (s *BaseSubscriber) Close() {} -func (s *BaseSubscriber) OnAccept() {} -func (s *BaseSubscriber) OnMessage(job NotifyJob) { +func (s *baseSubscriber) Id() string { return s.id } +func (s *baseSubscriber) Subject() string { return s.subject } +func (s *baseSubscriber) Group() string { return s.group } +func (s *baseSubscriber) Type() Type { return s.nType } +func (s *baseSubscriber) Service() *NotifyService { return s.service } +func (s *baseSubscriber) SetService(svc *NotifyService) { s.service = svc } +func (s *baseSubscriber) Err() error { return s.err } +func (s *baseSubscriber) SetError(err error) { s.err = err } +func (s *baseSubscriber) Close() {} +func (s *baseSubscriber) OnAccept() {} +func (s *baseSubscriber) OnMessage(job Event) { s.SetError(errors.New("do not call base notifier OnMessage method")) } -func NewSubscriber(nType NotifyType, subject, group string) *BaseSubscriber { - return &BaseSubscriber{ +func NewSubscriber(nType Type, subject, group string) Subscriber { + return &baseSubscriber{ id: util.GenerateUuid(), group: group, subject: subject, diff --git a/pkg/notify/types.go b/pkg/notify/types.go new file mode 100644 index 0000000..829eea5 --- /dev/null +++ b/pkg/notify/types.go @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package notify + +import "strconv" + +type Type int + +func (nt Type) String() string { + if int(nt) < len(typeNames) { + return typeNames[nt] + } + return "Type" + strconv.Itoa(int(nt)) +} + +func (nt Type) QueueSize() (s int) { + if int(nt) < len(typeQueues) { + s = typeQueues[nt] + } + if s <= 0 { + s = DefaultQueueSize + } + return +} + +var typeNames = []string{ + NOTIFTY: "NOTIFTY", +} + +var typeQueues = []int{ + NOTIFTY: 0, +} + +func Types() (ts []Type) { + for i := range typeNames { + ts = append(ts, Type(i)) + } + return +} + +func RegisterType(name string, size int) Type { + l := len(typeNames) + typeNames = append(typeNames, name) + typeQueues = append(typeQueues, size) + return Type(l) +} diff --git a/pkg/notify/types_test.go b/pkg/notify/types_test.go new file mode 100644 index 0000000..720af81 --- /dev/null +++ b/pkg/notify/types_test.go @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package notify + +import "testing" + +func TestRegisterType(t *testing.T) { + id := RegisterType("a", 0) + if id.String() != "a" || id.QueueSize() != DefaultQueueSize { + t.Fatal("TestRegisterType failed", id.String(), id.QueueSize()) + } + id = RegisterType("b", 1) + if id.String() != "b" || id.QueueSize() != 1 { + t.Fatal("TestRegisterType failed", id.String(), id.QueueSize()) + } + id = Type(999) + if id.String() != "Type999" || id.QueueSize() != DefaultQueueSize { + t.Fatal("TestRegisterType failed", id.String(), id.QueueSize()) + } + if NOTIFTY.String() != "NOTIFTY" || NOTIFTY.QueueSize() != DefaultQueueSize { + t.Fatal("TestRegisterType failed", id.String(), id.QueueSize()) + } +} diff --git a/server/alarm/alarm.go b/server/alarm/alarm.go index 64281b9..e3639e7 100644 --- a/server/alarm/alarm.go +++ b/server/alarm/alarm.go @@ -16,7 +16,8 @@ package alarm import ( - nf "github.com/apache/servicecomb-service-center/server/service/notification" + nf "github.com/apache/servicecomb-service-center/pkg/notify" + "github.com/apache/servicecomb-service-center/server/notify" "sync/atomic" ) @@ -80,5 +81,5 @@ func Alarm(id ID, fields ...Field) error { for _, f := range fields { ae.fields[f.Key] = f.Value } - return nf.GetNotifyService().Publish(ae) + return notify.NotifyCenter().Publish(ae) } diff --git a/server/notify/common.go b/server/notify/common.go new file mode 100644 index 0000000..b130cb1 --- /dev/null +++ b/server/notify/common.go @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package notify + +import ( + "github.com/apache/servicecomb-service-center/pkg/notify" + "time" +) + +const ( + AddJobTimeout = 1 * time.Second + SendTimeout = 5 * time.Second + HeartbeatTimeout = 30 * time.Second + InstanceEventQueueSize = 5000 +) + +var INSTANCE = notify.RegisterType("INSTANCE", InstanceEventQueueSize) +var notifyService *notify.NotifyService + +func init() { + notifyService = notify.NewNotifyService() +} + +func NotifyCenter() *notify.NotifyService { + return notifyService +} diff --git a/server/service/notification/listwatcher.go b/server/notify/listwatcher.go similarity index 66% rename from server/service/notification/listwatcher.go rename to server/notify/listwatcher.go index d0a90f8..0b831d7 100644 --- a/server/service/notification/listwatcher.go +++ b/server/notify/listwatcher.go @@ -14,38 +14,39 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package notification +package notify import ( "github.com/apache/servicecomb-service-center/pkg/gopool" "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/pkg/notify" pb "github.com/apache/servicecomb-service-center/server/core/proto" "golang.org/x/net/context" "time" ) // 状态变化推送 -type WatchJob struct { - *BaseNotifyJob +type InstanceEvent struct { + notify.Event Revision int64 Response *pb.WatchInstanceResponse } -type ListWatcher struct { - *BaseSubscriber - Job chan *WatchJob +type InstanceEventListWatcher struct { + notify.Subscriber + Job chan *InstanceEvent ListRevision int64 ListFunc func() (results []*pb.WatchInstanceResponse, rev int64) listCh chan struct{} } -func (s *ListWatcher) SetError(err error) { - s.BaseSubscriber.SetError(err) +func (s *InstanceEventListWatcher) SetError(err error) { + s.Subscriber.SetError(err) // 触发清理job - s.Service().AddJob(NewNotifyServiceHealthCheckJob(s)) + s.Service().Publish(notify.NewNotifyServiceHealthCheckJob(s)) } -func (w *ListWatcher) OnAccept() { +func (w *InstanceEventListWatcher) OnAccept() { if w.Err() != nil { return } @@ -53,7 +54,7 @@ func (w *ListWatcher) OnAccept() { gopool.Go(w.listAndPublishJobs) } -func (w *ListWatcher) listAndPublishJobs(_ context.Context) { +func (w *InstanceEventListWatcher) listAndPublishJobs(_ context.Context) { defer close(w.listCh) if w.ListFunc == nil { return @@ -61,17 +62,17 @@ func (w *ListWatcher) listAndPublishJobs(_ context.Context) { results, rev := w.ListFunc() w.ListRevision = rev for _, response := range results { - w.sendMessage(NewWatchJob(w.Group(), w.Subject(), w.ListRevision, response)) + w.sendMessage(NewInstanceEvent(w.Group(), w.Subject(), w.ListRevision, response)) } } //被通知 -func (w *ListWatcher) OnMessage(job NotifyJob) { +func (w *InstanceEventListWatcher) OnMessage(job notify.Event) { if w.Err() != nil { return } - wJob, ok := job.(*WatchJob) + wJob, ok := job.(*InstanceEvent) if !ok { return } @@ -98,7 +99,7 @@ func (w *ListWatcher) OnMessage(job NotifyJob) { w.sendMessage(wJob) } -func (w *ListWatcher) sendMessage(job *WatchJob) { +func (w *InstanceEventListWatcher) sendMessage(job *InstanceEvent) { defer log.Recover() select { case w.Job <- job: @@ -115,33 +116,29 @@ func (w *ListWatcher) sendMessage(job *WatchJob) { } } -func (w *ListWatcher) Timeout() time.Duration { - return DEFAULT_ADD_JOB_TIMEOUT +func (w *InstanceEventListWatcher) Timeout() time.Duration { + return AddJobTimeout } -func (w *ListWatcher) Close() { +func (w *InstanceEventListWatcher) Close() { close(w.Job) } -func NewWatchJob(group, subject string, rev int64, response *pb.WatchInstanceResponse) *WatchJob { - return &WatchJob{ - BaseNotifyJob: &BaseNotifyJob{ - group: group, - subject: subject, - nType: INSTANCE, - }, +func NewInstanceEvent(group, subject string, rev int64, response *pb.WatchInstanceResponse) *InstanceEvent { + return &InstanceEvent{ + Event: notify.NewEvent(INSTANCE, subject, group), Revision: rev, Response: response, } } -func NewListWatcher(group string, subject string, - listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *ListWatcher { - watcher := &ListWatcher{ - BaseSubscriber: NewSubscriber(INSTANCE, subject, group), - Job: make(chan *WatchJob, DEFAULT_MAX_QUEUE), - ListFunc: listFunc, - listCh: make(chan struct{}), +func NewInstanceEventListWatcher(group string, subject string, + listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *InstanceEventListWatcher { + watcher := &InstanceEventListWatcher{ + Subscriber: notify.NewSubscriber(INSTANCE, subject, group), + Job: make(chan *InstanceEvent, INSTANCE.QueueSize()), + ListFunc: listFunc, + listCh: make(chan struct{}), } return watcher } diff --git a/server/service/notification/publisher.go b/server/notify/publisher.go similarity index 99% rename from server/service/notification/publisher.go rename to server/notify/publisher.go index 70aa7b2..fc0d703 100644 --- a/server/service/notification/publisher.go +++ b/server/notify/publisher.go @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package notification +package notify import ( "github.com/apache/servicecomb-service-center/pkg/gopool" diff --git a/server/service/notification/stream.go b/server/notify/stream.go similarity index 68% rename from server/service/notification/stream.go rename to server/notify/stream.go index 7fc663d..b358c30 100644 --- a/server/service/notification/stream.go +++ b/server/notify/stream.go @@ -14,23 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package notification +package notify import ( "errors" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/util" + apt "github.com/apache/servicecomb-service-center/server/core" pb "github.com/apache/servicecomb-service-center/server/core/proto" + "golang.org/x/net/context" "time" ) -func HandleWatchJob(watcher *ListWatcher, stream pb.ServiceInstanceCtrl_WatchServer) (err error) { - timer := time.NewTimer(DEFAULT_HEARTBEAT_INTERVAL) +func HandleWatchJob(watcher *InstanceEventListWatcher, stream pb.ServiceInstanceCtrl_WatchServer) (err error) { + timer := time.NewTimer(HeartbeatTimeout) defer timer.Stop() for { select { case <-timer.C: - timer.Reset(DEFAULT_HEARTBEAT_INTERVAL) + timer.Reset(HeartbeatTimeout) // TODO grpc 长连接心跳? case job := <-watcher.Job: @@ -52,7 +54,17 @@ func HandleWatchJob(watcher *ListWatcher, stream pb.ServiceInstanceCtrl_WatchSer return } - util.ResetTimer(timer, DEFAULT_HEARTBEAT_INTERVAL) + util.ResetTimer(timer, HeartbeatTimeout) } } } + +func DoStreamListAndWatch(ctx context.Context, serviceId string, f func() ([]*pb.WatchInstanceResponse, int64), stream pb.ServiceInstanceCtrl_WatchServer) error { + domainProject := util.ParseDomainProject(ctx) + watcher := NewInstanceEventListWatcher(serviceId, apt.GetInstanceRootKey(domainProject)+"/", f) + err := NotifyCenter().AddSubscriber(watcher) + if err != nil { + return err + } + return HandleWatchJob(watcher, stream) +} diff --git a/server/service/notification/stream_test.go b/server/notify/stream_test.go similarity index 68% rename from server/service/notification/stream_test.go rename to server/notify/stream_test.go index 5193045..e99ef3e 100644 --- a/server/service/notification/stream_test.go +++ b/server/notify/stream_test.go @@ -14,21 +14,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package notification +package notify -import "testing" +import ( + "github.com/apache/servicecomb-service-center/pkg/log" + "golang.org/x/net/context" + "testing" +) func TestHandleWatchJob(t *testing.T) { - defer func() { recover() }() - w := NewListWatcher("g", "s", nil) + defer log.Recover() + w := NewInstanceEventListWatcher("g", "s", nil) w.Job <- nil err := HandleWatchJob(w, nil) if err == nil { t.Fatalf("TestHandleWatchJob failed") } - w.Job <- NewWatchJob("g", "s", 1, nil) + w.Job <- NewInstanceEvent("g", "s", 1, nil) err = HandleWatchJob(w, nil) - if err != nil { - t.Fatalf("TestHandleWatchJob failed") + t.Fatalf("TestHandleWatchJob failed") +} + +func TestDoStreamListAndWatch(t *testing.T) { + err := DoStreamListAndWatch(context.Background(), "s", nil, nil) + if err == nil { + t.Fatal("TestDoStreamListAndWatch failed", err) } } diff --git a/server/service/notification/websocket.go b/server/notify/websocket.go similarity index 95% rename from server/service/notification/websocket.go rename to server/notify/websocket.go index 07db6f6..64f6203 100644 --- a/server/service/notification/websocket.go +++ b/server/notify/websocket.go @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package notification +package notify import ( "encoding/json" @@ -34,14 +34,14 @@ type WebSocket struct { ticker *time.Ticker conn *websocket.Conn // watcher subscribe the notification service event - watcher *ListWatcher + watcher *InstanceEventListWatcher needPingWatcher bool free chan struct{} closed chan struct{} } func (wh *WebSocket) Init() error { - wh.ticker = time.NewTicker(DEFAULT_HEARTBEAT_INTERVAL) + wh.ticker = time.NewTicker(HeartbeatTimeout) wh.needPingWatcher = true wh.free = make(chan struct{}, 1) wh.closed = make(chan struct{}) @@ -51,7 +51,7 @@ func (wh *WebSocket) Init() error { remoteAddr := wh.conn.RemoteAddr().String() // put in notification service queue - if err := GetNotifyService().AddSubscriber(wh.watcher); err != nil { + if err := NotifyCenter().AddSubscriber(wh.watcher); err != nil { err = fmt.Errorf("establish[%s] websocket watch failed: notify service error, %s", remoteAddr, err.Error()) log.Errorf(nil, err.Error()) @@ -72,7 +72,7 @@ func (wh *WebSocket) Init() error { } func (wh *WebSocket) Timeout() time.Duration { - return DEFAULT_SEND_TIMEOUT + return SendTimeout } func (wh *WebSocket) heartbeat(messageType int) error { @@ -194,8 +194,8 @@ func (wh *WebSocket) HandleWatchWebSocketJob(o interface{}) { remoteAddr, wh.watcher.Subject(), wh.watcher.Group()) wh.heartbeat(websocket.PingMessage) return - case *WatchJob: - job := o.(*WatchJob) + case *InstanceEvent: + job := o.(*InstanceEvent) resp := job.Response providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, resp.Key.ServiceName, resp.Key.Version) @@ -253,7 +253,7 @@ func DoWebSocketListAndWatch(ctx context.Context, serviceId string, f func() ([] socket := &WebSocket{ ctx: ctx, conn: conn, - watcher: NewListWatcher(serviceId, apt.GetInstanceRootKey(domainProject)+"/", f), + watcher: NewInstanceEventListWatcher(serviceId, apt.GetInstanceRootKey(domainProject)+"/", f), } process(socket) } diff --git a/server/service/notification/websocket_test.go b/server/notify/websocket_test.go similarity index 90% rename from server/service/notification/websocket_test.go rename to server/notify/websocket_test.go index 403a2e3..a8b61e5 100644 --- a/server/service/notification/websocket_test.go +++ b/server/notify/websocket_test.go @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package notification +package notify import _ "github.com/apache/servicecomb-service-center/server/init" import ( @@ -56,31 +56,12 @@ func (h *watcherConn) ServeHTTP(w http.ResponseWriter, r *http.Request) { func TestDoWebSocketListAndWatch(t *testing.T) { s := httptest.NewServer(&watcherConn{}) - GetNotifyService().Start() - conn, _, _ := websocket.DefaultDialer.Dial( strings.Replace(s.URL, "http://", "ws://", 1), nil) - go func() { - DoWebSocketListAndWatch(context.Background(), "", nil, conn) - - w2 := NewListWatcher("g", "s", func() (results []*proto.WatchInstanceResponse, rev int64) { - return - }) - ws2 := &WebSocket{ - ctx: context.Background(), - conn: conn, - watcher: w2, - } - err := ws2.Init() - if err != nil { - t.Fatalf("TestPublisher_Run") - } - }() - EstablishWebSocketError(conn, errors.New("error")) - w := NewListWatcher("g", "s", func() (results []*proto.WatchInstanceResponse, rev int64) { + w := NewInstanceEventListWatcher("g", "s", func() (results []*proto.WatchInstanceResponse, rev int64) { results = append(results, &proto.WatchInstanceResponse{ Response: proto.CreateResponse(proto.Response_SUCCESS, "ok"), Action: string(proto.EVT_CREATE), @@ -89,7 +70,6 @@ func TestDoWebSocketListAndWatch(t *testing.T) { }) return }) - w.nType = -1 ws := &WebSocket{ ctx: context.Background(), @@ -101,7 +81,25 @@ func TestDoWebSocketListAndWatch(t *testing.T) { t.Fatalf("TestPublisher_Run") } - w.nType = INSTANCE + NotifyCenter().Start() + + go func() { + DoWebSocketListAndWatch(context.Background(), "", nil, conn) + + w2 := NewInstanceEventListWatcher("g", "s", func() (results []*proto.WatchInstanceResponse, rev int64) { + return + }) + ws2 := &WebSocket{ + ctx: context.Background(), + conn: conn, + watcher: w2, + } + err := ws2.Init() + if err != nil { + t.Fatalf("TestPublisher_Run") + } + }() + err = ws.Init() if err != nil { t.Fatalf("TestPublisher_Run") @@ -109,9 +107,9 @@ func TestDoWebSocketListAndWatch(t *testing.T) { go ws.HandleWatchWebSocketControlMessage() w.OnMessage(nil) - w.OnMessage(&WatchJob{}) + w.OnMessage(&InstanceEvent{}) - GetNotifyService().AddJob(NewWatchJob("g", "s", 1, &proto.WatchInstanceResponse{ + NotifyCenter().Publish(NewInstanceEvent("g", "s", 1, &proto.WatchInstanceResponse{ Response: proto.CreateResponse(proto.Response_SUCCESS, "ok"), Action: string(proto.EVT_CREATE), Key: &proto.MicroServiceKey{}, diff --git a/server/server.go b/server/server.go index 5d28e74..f7af7b7 100644 --- a/server/server.go +++ b/server/server.go @@ -21,11 +21,12 @@ import ( "fmt" "github.com/apache/servicecomb-service-center/pkg/gopool" "github.com/apache/servicecomb-service-center/pkg/log" + nf "github.com/apache/servicecomb-service-center/pkg/notify" "github.com/apache/servicecomb-service-center/server/core" "github.com/apache/servicecomb-service-center/server/core/backend" "github.com/apache/servicecomb-service-center/server/mux" + "github.com/apache/servicecomb-service-center/server/notify" "github.com/apache/servicecomb-service-center/server/plugin" - nf "github.com/apache/servicecomb-service-center/server/service/notification" serviceUtil "github.com/apache/servicecomb-service-center/server/service/util" "github.com/apache/servicecomb-service-center/version" "github.com/astaxie/beego" @@ -132,7 +133,7 @@ func (s *ServiceCenterServer) compactBackendService() { func (s *ServiceCenterServer) initialize() { s.store = backend.Store() - s.notifyService = nf.GetNotifyService() + s.notifyService = notify.NotifyCenter() s.apiServer = GetAPIServer() s.goroutine = gopool.New(context.Background()) diff --git a/server/service/event/instance_event_handler.go b/server/service/event/instance_event_handler.go index d526d74..d1c81ee 100644 --- a/server/service/event/instance_event_handler.go +++ b/server/service/event/instance_event_handler.go @@ -22,10 +22,10 @@ import ( apt "github.com/apache/servicecomb-service-center/server/core" "github.com/apache/servicecomb-service-center/server/core/backend" pb "github.com/apache/servicecomb-service-center/server/core/proto" + "github.com/apache/servicecomb-service-center/server/notify" "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery" "github.com/apache/servicecomb-service-center/server/service/cache" "github.com/apache/servicecomb-service-center/server/service/metrics" - nf "github.com/apache/servicecomb-service-center/server/service/notification" serviceUtil "github.com/apache/servicecomb-service-center/server/service/util" "golang.org/x/net/context" "strings" @@ -59,7 +59,7 @@ func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) { } } - if nf.GetNotifyService().Closed() { + if notify.NotifyCenter().Closed() { log.Warnf("caught [%s] instance[%s/%s] event, but notify service is closed", action, providerId, providerInstanceId) return @@ -110,7 +110,7 @@ func PublishInstanceEvent(domainProject string, action pb.EventType, serviceKey } for _, consumerId := range subscribers { // TODO add超时怎么处理? - job := nf.NewWatchJob(consumerId, apt.GetInstanceRootKey(domainProject)+"/", rev, response) - nf.GetNotifyService().AddJob(job) + job := notify.NewInstanceEvent(consumerId, apt.GetInstanceRootKey(domainProject)+"/", rev, response) + notify.NotifyCenter().Publish(job) } } diff --git a/server/service/event/rule_event_handler.go b/server/service/event/rule_event_handler.go index 095dd0e..58051d9 100644 --- a/server/service/event/rule_event_handler.go +++ b/server/service/event/rule_event_handler.go @@ -23,8 +23,8 @@ import ( "github.com/apache/servicecomb-service-center/server/core" "github.com/apache/servicecomb-service-center/server/core/backend" pb "github.com/apache/servicecomb-service-center/server/core/proto" + "github.com/apache/servicecomb-service-center/server/notify" "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery" - nf "github.com/apache/servicecomb-service-center/server/service/notification" serviceUtil "github.com/apache/servicecomb-service-center/server/service/util" "golang.org/x/net/context" ) @@ -92,7 +92,7 @@ func (h *RuleEventHandler) OnEvent(evt discovery.KvEvent) { } providerId, ruleId, domainProject := core.GetInfoFromRuleKV(evt.KV.Key) - if nf.GetNotifyService().Closed() { + if notify.NotifyCenter().Closed() { log.Warnf("caught [%s] service rule[%s/%s] event, but notify service is closed", action, providerId, ruleId) return diff --git a/server/service/event/tag_event_handler.go b/server/service/event/tag_event_handler.go index fb7ea03..7059e8f 100644 --- a/server/service/event/tag_event_handler.go +++ b/server/service/event/tag_event_handler.go @@ -23,9 +23,9 @@ import ( "github.com/apache/servicecomb-service-center/server/core" "github.com/apache/servicecomb-service-center/server/core/backend" pb "github.com/apache/servicecomb-service-center/server/core/proto" + "github.com/apache/servicecomb-service-center/server/notify" "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery" "github.com/apache/servicecomb-service-center/server/service/cache" - nf "github.com/apache/servicecomb-service-center/server/service/notification" serviceUtil "github.com/apache/servicecomb-service-center/server/service/util" "golang.org/x/net/context" ) @@ -106,7 +106,7 @@ func (h *TagEventHandler) OnEvent(evt discovery.KvEvent) { consumerId, domainProject := core.GetInfoFromTagKV(evt.KV.Key) - if nf.GetNotifyService().Closed() { + if notify.NotifyCenter().Closed() { log.Warnf("caught [%s] service tags[%s/%s] event, but notify service is closed", action, consumerId, evt.KV.Value) return diff --git a/server/service/watch.go b/server/service/watch.go index 4e1a629..8b4c039 100644 --- a/server/service/watch.go +++ b/server/service/watch.go @@ -20,9 +20,8 @@ import ( "errors" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/util" - apt "github.com/apache/servicecomb-service-center/server/core" pb "github.com/apache/servicecomb-service-center/server/core/proto" - nf "github.com/apache/servicecomb-service-center/server/service/notification" + "github.com/apache/servicecomb-service-center/server/notify" serviceUtil "github.com/apache/servicecomb-service-center/server/service/util" "github.com/gorilla/websocket" "golang.org/x/net/context" @@ -40,34 +39,31 @@ func (s *InstanceService) WatchPreOpera(ctx context.Context, in *pb.WatchInstanc } func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, stream pb.ServiceInstanceCtrl_WatchServer) error { - var err error - if err = s.WatchPreOpera(stream.Context(), in); err != nil { + log.Infof("new a stream list and watch with service[%s]", in.SelfServiceId) + if err := s.WatchPreOpera(stream.Context(), in); err != nil { log.Errorf(err, "service[%s] establish watch failed: invalid params", in.SelfServiceId) return err } - domainProject := util.ParseDomainProject(stream.Context()) - watcher := nf.NewListWatcher(in.SelfServiceId, apt.GetInstanceRootKey(domainProject)+"/", nil) - err = nf.GetNotifyService().AddSubscriber(watcher) - log.Infof("watcher[%s/%s] start watch instance status", watcher.Subject(), watcher.Group()) - return nf.HandleWatchJob(watcher, stream) + + return notify.DoStreamListAndWatch(stream.Context(), in.SelfServiceId, nil, stream) } func (s *InstanceService) WebSocketWatch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) { log.Infof("new a web socket watch with service[%s]", in.SelfServiceId) if err := s.WatchPreOpera(ctx, in); err != nil { - nf.EstablishWebSocketError(conn, err) + notify.EstablishWebSocketError(conn, err) return } - nf.DoWebSocketListAndWatch(ctx, in.SelfServiceId, nil, conn) + notify.DoWebSocketListAndWatch(ctx, in.SelfServiceId, nil, conn) } func (s *InstanceService) WebSocketListAndWatch(ctx context.Context, in *pb.WatchInstanceRequest, conn *websocket.Conn) { log.Infof("new a web socket list and watch with service[%s]", in.SelfServiceId) if err := s.WatchPreOpera(ctx, in); err != nil { - nf.EstablishWebSocketError(conn, err) + notify.EstablishWebSocketError(conn, err) return } - nf.DoWebSocketListAndWatch(ctx, in.SelfServiceId, func() ([]*pb.WatchInstanceResponse, int64) { + notify.DoWebSocketListAndWatch(ctx, in.SelfServiceId, func() ([]*pb.WatchInstanceResponse, int64) { return serviceUtil.QueryAllProvidersInstances(ctx, in.SelfServiceId) }, conn) }
