This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch v1.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/v1.x by this push:
new b124ab5 Issue: Refactor event bus (#976)
b124ab5 is described below
commit b124ab50eaf7bcecc01ffb1ce6da092685de9d0b
Author: little-cui <[email protected]>
AuthorDate: Thu May 13 09:52:34 2021 +0800
Issue: Refactor event bus (#976)
* SCB-2094 Refactor notification center (#741)
* SCB-2094 Refactor notification center
* SCB-2094 Add pkg description
(cherry picked from commit 600b32303ad6fbda4cdebe29737bad8bb8978bdc)
* SCB-2176 Pick master branch
* SCB-2176 Refactor event bus
---
pkg/{notify/processor.go => event/bus.go} | 58 ++++++++++---------
.../bus_service.go} | 67 +++++++++++-----------
.../bus_service_test.go} | 12 ++--
.../processor_test.go => event/bus_test.go} | 12 ++--
pkg/{notify => event}/common.go | 4 +-
pkg/{notify/notice.go => event/event.go} | 2 +-
pkg/{notify/notice_test.go => event/event_test.go} | 6 +-
pkg/{notify/subject.go => event/poster.go} | 41 +++++++------
.../subject_test.go => event/poster_test.go} | 16 +++---
pkg/{notify => event}/subscriber.go | 28 ++++-----
.../subscriber_checker.go} | 32 +++++------
pkg/{notify/group.go => event/subscriber_group.go} | 38 ++++++------
.../subscriber_group_test.go} | 25 ++++----
pkg/{notify => event}/types.go | 6 +-
pkg/{notify => event}/types_test.go | 4 +-
pkg/queue/taskqueue.go | 6 +-
pkg/queue/taskqueue_test.go | 10 ++--
pkg/notify/common.go => server/alarm/center.go | 15 +++--
server/alarm/common.go | 4 +-
server/alarm/model/types.go | 2 +-
server/alarm/service.go | 17 ++----
.../common.go => server/connection/connection.go | 12 ++--
server/{notify => connection/grpc}/stream.go | 56 +++++++++---------
server/{notify => connection/grpc}/stream_test.go | 14 +++--
server/{notify => connection}/metrics.go | 36 ++++++------
server/{notify => connection/ws}/publisher.go | 8 ++-
server/{notify => connection/ws}/websocket.go | 63 +++++++++++---------
server/{notify => connection/ws}/websocket_test.go | 50 +++++++---------
server/{notify => event}/center.go | 16 +++---
.../instance_subscriber.go} | 29 ++++++----
server/health/health_test.go | 4 +-
server/notify/common.go | 29 ----------
server/plugin/discovery/k8s/adaptor/listwatcher.go | 6 +-
server/server.go | 20 +++----
server/service/event/instance_event_handler.go | 10 ++--
server/service/event/rule_event_handler.go | 4 +-
server/service/event/tag_event_handler.go | 4 +-
server/service/watch.go | 13 +++--
38 files changed, 386 insertions(+), 393 deletions(-)
diff --git a/pkg/notify/processor.go b/pkg/event/bus.go
similarity index 54%
rename from pkg/notify/processor.go
rename to pkg/event/bus.go
index 5b24885..3735b0a 100644
--- a/pkg/notify/processor.go
+++ b/pkg/event/bus.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package notify
+package event
import (
"context"
@@ -23,74 +23,76 @@ import (
"github.com/apache/servicecomb-service-center/pkg/util"
)
-type Processor struct {
+// Bus can fire the event aync and dispatch events to subscriber according to
subject
+type Bus struct {
*queue.TaskQueue
name string
subjects *util.ConcurrentMap
}
-func (p *Processor) Name() string {
- return p.name
+func (bus *Bus) Name() string {
+ return bus.name
}
-func (p *Processor) Accept(job Event) {
- p.Add(queue.Task{Object: job})
+func (bus *Bus) Fire(evt Event) {
+ // TODO add option if queue is full
+ bus.Add(queue.Task{Payload: evt})
}
-func (p *Processor) Handle(ctx context.Context, obj interface{}) {
- p.Notify(obj.(Event))
+func (bus *Bus) Handle(ctx context.Context, evt interface{}) {
+ bus.fireAtOnce(evt.(Event))
}
-func (p *Processor) Notify(job Event) {
- if itf, ok := p.subjects.Get(job.Subject()); ok {
- itf.(*Subject).Notify(job)
+func (bus *Bus) fireAtOnce(evt Event) {
+ if itf, ok := bus.subjects.Get(evt.Subject()); ok {
+ itf.(*Poster).Post(evt)
}
}
-func (p *Processor) Subjects(name string) *Subject {
- itf, ok := p.subjects.Get(name)
+func (bus *Bus) Subjects(name string) *Poster {
+ itf, ok := bus.subjects.Get(name)
if !ok {
return nil
}
- return itf.(*Subject)
+ return itf.(*Poster)
}
-func (p *Processor) AddSubscriber(n Subscriber) {
- item, _ := p.subjects.Fetch(n.Subject(), func() (interface{}, error) {
- return NewSubject(n.Subject()), nil
+func (bus *Bus) AddSubscriber(n Subscriber) {
+ item, _ := bus.subjects.Fetch(n.Subject(), func() (interface{}, error) {
+ return NewPoster(n.Subject()), nil
})
- item.(*Subject).GetOrNewGroup(n.Group()).AddSubscriber(n)
+ item.(*Poster).GetOrNewGroup(n.Group()).AddMember(n)
}
-func (p *Processor) Remove(n Subscriber) {
- itf, ok := p.subjects.Get(n.Subject())
+func (bus *Bus) RemoveSubscriber(n Subscriber) {
+ itf, ok := bus.subjects.Get(n.Subject())
if !ok {
return
}
- s := itf.(*Subject)
+ s := itf.(*Poster)
g := s.Groups(n.Group())
if g == nil {
return
}
- g.Remove(n.ID())
+ g.RemoveMember(n.ID())
if g.Size() == 0 {
- s.Remove(g.Name())
+ s.RemoveGroup(g.Name())
}
if s.Size() == 0 {
- p.subjects.Remove(s.Name())
+ bus.subjects.Remove(s.Subject())
}
}
-func (p *Processor) Clear() {
- p.subjects.Clear()
+func (bus *Bus) Clear() {
+ bus.subjects.Clear()
}
-func NewProcessor(name string, queueSize int) *Processor {
- p := &Processor{
+func NewBus(name string, queueSize int) *Bus {
+ p := &Bus{
TaskQueue: queue.NewTaskQueue(queueSize),
name: name,
subjects: util.NewConcurrentMap(0),
diff --git a/pkg/notify/notification_service.go b/pkg/event/bus_service.go
similarity index 64%
rename from pkg/notify/notification_service.go
rename to pkg/event/bus_service.go
index 658d6ab..b5c8cd2 100644
--- a/pkg/notify/notification_service.go
+++ b/pkg/event/bus_service.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package notify
+package event
import (
"errors"
@@ -24,15 +24,18 @@ import (
"sync"
)
-type Service struct {
- processors map[Type]*Processor
- mux sync.RWMutex
- isClose bool
+// BusService is the daemon service to manage multiple type Bus
+// and wrap handle methods of Bus
+type BusService struct {
+ // buses is the map of event handler, key is event source type
+ buses map[Type]*Bus
+ mux sync.RWMutex
+ isClose bool
}
-func (s *Service) newProcessor(t Type) *Processor {
+func (s *BusService) newBus(t Type) *Bus {
s.mux.RLock()
- p, ok := s.processors[t]
+ p, ok := s.buses[t]
if ok {
s.mux.RUnlock()
return p
@@ -40,20 +43,20 @@ func (s *Service) newProcessor(t Type) *Processor {
s.mux.RUnlock()
s.mux.Lock()
- p, ok = s.processors[t]
+ p, ok = s.buses[t]
if ok {
s.mux.Unlock()
return p
}
- p = NewProcessor(t.String(), t.QueueSize())
- s.processors[t] = p
+ p = NewBus(t.String(), t.QueueSize())
+ s.buses[t] = p
s.mux.Unlock()
p.Run()
return p
}
-func (s *Service) Start() {
+func (s *BusService) Start() {
if !s.Closed() {
log.Warnf("notify service is already running")
return
@@ -63,7 +66,7 @@ func (s *Service) Start() {
s.mux.Unlock()
// 错误subscriber清理
- err := s.AddSubscriber(NewNotifyServiceHealthChecker())
+ err := s.AddSubscriber(NewSubscriberHealthChecker())
if err != nil {
log.Error("", err)
}
@@ -71,7 +74,7 @@ func (s *Service) Start() {
log.Debugf("notify service is started")
}
-func (s *Service) AddSubscriber(n Subscriber) error {
+func (s *BusService) AddSubscriber(n Subscriber) error {
if n == nil {
err := errors.New("required Subscriber")
log.Errorf(err, "add subscriber failed")
@@ -84,30 +87,30 @@ func (s *Service) AddSubscriber(n Subscriber) error {
return err
}
- p := s.newProcessor(n.Type())
- n.SetService(s)
+ p := s.newBus(n.Type())
+ n.SetBus(s)
n.OnAccept()
p.AddSubscriber(n)
return nil
}
-func (s *Service) RemoveSubscriber(n Subscriber) {
+func (s *BusService) RemoveSubscriber(n Subscriber) {
s.mux.RLock()
- p, ok := s.processors[n.Type()]
+ p, ok := s.buses[n.Type()]
if !ok {
s.mux.RUnlock()
return
}
s.mux.RUnlock()
- p.Remove(n)
+ p.RemoveSubscriber(n)
n.Close()
}
-func (s *Service) stopProcessors() {
+func (s *BusService) closeBuses() {
s.mux.RLock()
- for _, p := range s.processors {
+ for _, p := range s.buses {
p.Clear()
p.Stop()
}
@@ -115,30 +118,30 @@ func (s *Service) stopProcessors() {
}
//通知内容塞到队列里
-func (s *Service) Publish(job Event) error {
+func (s *BusService) Fire(evt Event) error {
if s.Closed() {
- return errors.New("add notify job failed for server shutdown")
+ return errors.New("add notify evt failed for server shutdown")
}
s.mux.RLock()
- p, ok := s.processors[job.Type()]
+ bus, ok := s.buses[evt.Type()]
if !ok {
s.mux.RUnlock()
- return fmt.Errorf("unknown job type[%s]", job.Type())
+ return fmt.Errorf("unknown evt type[%s]", evt.Type())
}
s.mux.RUnlock()
- p.Accept(job)
+ bus.Fire(evt)
return nil
}
-func (s *Service) Closed() (b bool) {
+func (s *BusService) Closed() (b bool) {
s.mux.RLock()
b = s.isClose
s.mux.RUnlock()
return
}
-func (s *Service) Stop() {
+func (s *BusService) Stop() {
if s.Closed() {
return
}
@@ -146,14 +149,14 @@ func (s *Service) Stop() {
s.isClose = true
s.mux.Unlock()
- s.stopProcessors()
+ s.closeBuses()
log.Debug("notify service stopped")
}
-func NewNotifyService() *Service {
- return &Service{
- processors: make(map[Type]*Processor),
- isClose: true,
+func NewBusService() *BusService {
+ return &BusService{
+ buses: make(map[Type]*Bus),
+ isClose: true,
}
}
diff --git a/pkg/notify/notification_test.go b/pkg/event/bus_service_test.go
similarity index 88%
rename from pkg/notify/notification_test.go
rename to pkg/event/bus_service_test.go
index f2af609..752101d 100644
--- a/pkg/notify/notification_test.go
+++ b/pkg/event/bus_service_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package notify
+package event
import (
simple "github.com/apache/servicecomb-service-center/pkg/time"
@@ -26,7 +26,7 @@ import (
func TestGetNotifyService(t *testing.T) {
INSTANCE := RegisterType("INSTANCE", 1)
- notifyService := NewNotifyService()
+ notifyService := NewBusService()
if notifyService == nil {
t.Fatalf("TestGetNotifyService failed")
}
@@ -38,7 +38,7 @@ func TestGetNotifyService(t *testing.T) {
if err == nil {
t.Fatalf("TestGetNotifyService failed")
}
- err = notifyService.Publish(nil)
+ err = notifyService.Fire(nil)
if err == nil {
t.Fatalf("TestGetNotifyService failed")
}
@@ -62,15 +62,15 @@ func TestGetNotifyService(t *testing.T) {
t.Fatalf("TestGetNotifyService failed, %v", err)
}
j := &baseEvent{INSTANCE, "s", "g", simple.FromTime(time.Now())}
- err = notifyService.Publish(j)
+ err = notifyService.Fire(j)
if err != nil {
t.Fatalf("TestGetNotifyService failed")
}
- err =
notifyService.Publish(NewNotifyServiceHealthCheckJob(NewNotifyServiceHealthChecker()))
+ err =
notifyService.Fire(NewUnhealthyEvent(NewSubscriberHealthChecker()))
if err != nil {
t.Fatalf("TestGetNotifyService failed")
}
- err = notifyService.Publish(NewNotifyServiceHealthCheckJob(s))
+ err = notifyService.Fire(NewUnhealthyEvent(s))
if err != nil {
t.Fatalf("TestGetNotifyService failed")
}
diff --git a/pkg/notify/processor_test.go b/pkg/event/bus_test.go
similarity index 91%
rename from pkg/notify/processor_test.go
rename to pkg/event/bus_test.go
index 894231b..a5e214b 100644
--- a/pkg/notify/processor_test.go
+++ b/pkg/event/bus_test.go
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package notify
+package event
import (
"testing"
@@ -37,7 +37,7 @@ func TestProcessor_Do(t *testing.T) {
job: make(chan Event, 1)}
mock2 := &mockSubscriberChan{Subscriber: NewSubscriber(INSTANCE, "s1",
"g2"),
job: make(chan Event, 1)}
- p := NewProcessor("p1", 0)
+ p := NewBus("p1", 0)
if p.Name() != "p1" {
t.Fatalf("TestProcessor_Do")
}
@@ -45,12 +45,12 @@ func TestProcessor_Do(t *testing.T) {
t.Fatalf("TestProcessor_Do")
}
p.AddSubscriber(mock1)
- if
p.Subjects(mock1.Subject()).Groups(mock1.Group()).Subscribers(mock1.ID()) !=
mock1 {
+ if p.Subjects(mock1.Subject()).Groups(mock1.Group()).Member(mock1.ID())
!= mock1 {
t.Fatalf("TestProcessor_Do")
}
- p.Remove(NewSubscriber(INSTANCE, "s2", "g1"))
- p.Remove(NewSubscriber(INSTANCE, "s1", "g2"))
- p.Remove(mock1)
+ p.RemoveSubscriber(NewSubscriber(INSTANCE, "s2", "g1"))
+ p.RemoveSubscriber(NewSubscriber(INSTANCE, "s1", "g2"))
+ p.RemoveSubscriber(mock1)
if p.Subjects(mock1.Subject()) != nil {
t.Fatalf("TestProcessor_Do")
}
diff --git a/pkg/notify/common.go b/pkg/event/common.go
similarity index 95%
copy from pkg/notify/common.go
copy to pkg/event/common.go
index c98e31c..3b2f115 100644
--- a/pkg/notify/common.go
+++ b/pkg/event/common.go
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package notify
+package event
const (
DefaultQueueSize = 1000
)
const (
- NOTIFTY Type = iota
+ INNER Type = iota
)
diff --git a/pkg/notify/notice.go b/pkg/event/event.go
similarity index 99%
rename from pkg/notify/notice.go
rename to pkg/event/event.go
index 8015d7d..eeac4ce 100644
--- a/pkg/notify/notice.go
+++ b/pkg/event/event.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package notify
+package event
import (
simple "github.com/apache/servicecomb-service-center/pkg/time"
diff --git a/pkg/notify/notice_test.go b/pkg/event/event_test.go
similarity index 89%
rename from pkg/notify/notice_test.go
rename to pkg/event/event_test.go
index 5ad78d2..ed181fb 100644
--- a/pkg/notify/notice_test.go
+++ b/pkg/event/event_test.go
@@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package notify
+package event
import (
"fmt"
@@ -21,13 +21,13 @@ import (
)
func TestNewEventWithTime(t *testing.T) {
- evt := NewEvent(NOTIFTY, "a", "b")
+ evt := NewEvent(INNER, "a", "b")
if evt.CreateAt().UnixNano() == 0 {
t.Fatal("TestNewEventWithTime")
}
fmt.Println(evt.CreateAt())
- if evt.Type() != NOTIFTY || evt.Subject() != "a" || evt.Group() != "b" {
+ if evt.Type() != INNER || evt.Subject() != "a" || evt.Group() != "b" {
t.Fatal("TestNewEventWithTime")
}
}
diff --git a/pkg/notify/subject.go b/pkg/event/poster.go
similarity index 66%
rename from pkg/notify/subject.go
rename to pkg/event/poster.go
index 4f79ba9..4cb3f49 100644
--- a/pkg/notify/subject.go
+++ b/pkg/event/poster.go
@@ -15,25 +15,32 @@
* limitations under the License.
*/
-package notify
+package event
import (
"github.com/apache/servicecomb-service-center/pkg/util"
)
-type Subject struct {
- name string
- groups *util.ConcurrentMap
+// Poster post the events of specified subject to the subscribers
+type Poster struct {
+ subject string
+ groups *util.ConcurrentMap
}
-func (s *Subject) Name() string {
- return s.name
+func (s *Poster) Subject() string {
+ return s.subject
}
-func (s *Subject) Notify(job Event) {
+func (s *Poster) Post(job Event) {
+ f := func(g *Group) {
+ g.ForEach(func(m Subscriber) {
+ m.OnMessage(job)
+ })
+ }
+
if len(job.Group()) == 0 {
s.groups.ForEach(func(item util.MapItem) (next bool) {
- item.Value.(*Group).Notify(job)
+ f(item.Value.(*Group))
return true
})
return
@@ -43,10 +50,10 @@ func (s *Subject) Notify(job Event) {
if !ok {
return
}
- itf.(*Group).Notify(job)
+ f(itf.(*Group))
}
-func (s *Subject) Groups(name string) *Group {
+func (s *Poster) Groups(name string) *Group {
g, ok := s.groups.Get(name)
if !ok {
return nil
@@ -54,24 +61,24 @@ func (s *Subject) Groups(name string) *Group {
return g.(*Group)
}
-func (s *Subject) GetOrNewGroup(name string) *Group {
+func (s *Poster) GetOrNewGroup(name string) *Group {
item, _ := s.groups.Fetch(name, func() (interface{}, error) {
return NewGroup(name), nil
})
return item.(*Group)
}
-func (s *Subject) Remove(name string) {
+func (s *Poster) RemoveGroup(name string) {
s.groups.Remove(name)
}
-func (s *Subject) Size() int {
+func (s *Poster) Size() int {
return s.groups.Size()
}
-func NewSubject(name string) *Subject {
- return &Subject{
- name: name,
- groups: util.NewConcurrentMap(0),
+func NewPoster(subject string) *Poster {
+ return &Poster{
+ subject: subject,
+ groups: util.NewConcurrentMap(0),
}
}
diff --git a/pkg/notify/subject_test.go b/pkg/event/poster_test.go
similarity index 92%
rename from pkg/notify/subject_test.go
rename to pkg/event/poster_test.go
index c9e955c..931b5de 100644
--- a/pkg/notify/subject_test.go
+++ b/pkg/event/poster_test.go
@@ -14,13 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package notify
+package event
import "testing"
func TestSubject_Fetch(t *testing.T) {
- s := NewSubject("s1")
- if s.Name() != "s1" {
+ s := NewPoster("s1")
+ if s.Subject() != "s1" {
t.Fatalf("TestSubject_Fetch failed")
}
g := s.GetOrNewGroup("g1")
@@ -37,7 +37,7 @@ func TestSubject_Fetch(t *testing.T) {
if s.Size() != 2 {
t.Fatalf("TestSubject_Fetch failed")
}
- s.Remove(o.Name())
+ s.RemoveGroup(o.Name())
if s.Groups("g2") != nil {
t.Fatalf("TestSubject_Fetch failed")
}
@@ -47,19 +47,19 @@ func TestSubject_Fetch(t *testing.T) {
INSTANCE := RegisterType("INSTANCE", 1)
mock1 := &mockSubscriber{Subscriber: NewSubscriber(INSTANCE, "s1",
"g1")}
mock2 := &mockSubscriber{Subscriber: NewSubscriber(INSTANCE, "s1",
"g2")}
- g.AddSubscriber(mock1)
+ g.AddMember(mock1)
job := &baseEvent{group: "g3"}
- s.Notify(job)
+ s.Post(job)
if mock1.job != nil || mock2.job != nil {
t.Fatalf("TestSubject_Fetch failed")
}
job.group = "g1"
- s.Notify(job)
+ s.Post(job)
if mock1.job != job || mock2.job != nil {
t.Fatalf("TestSubject_Fetch failed")
}
job.group = ""
- s.Notify(job)
+ s.Post(job)
if mock1.job != job && mock2.job != job {
t.Fatalf("TestSubject_Fetch failed")
}
diff --git a/pkg/notify/subscriber.go b/pkg/event/subscriber.go
similarity index 67%
rename from pkg/notify/subscriber.go
rename to pkg/event/subscriber.go
index 85eabcd..1a3048b 100644
--- a/pkg/notify/subscriber.go
+++ b/pkg/event/subscriber.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package notify
+package event
import (
"errors"
@@ -27,8 +27,8 @@ type Subscriber interface {
Subject() string
Group() string
Type() Type
- Service() *Service
- SetService(*Service)
+ Bus() *BusService
+ SetBus(*BusService)
Err() error
SetError(err error)
@@ -44,20 +44,20 @@ type baseSubscriber struct {
id string
subject string
group string
- service *Service
+ service *BusService
err error
}
-func (s *baseSubscriber) ID() string { return s.id }
-func (s *baseSubscriber) Subject() string { return s.subject }
-func (s *baseSubscriber) Group() string { return s.group }
-func (s *baseSubscriber) Type() Type { return s.nType }
-func (s *baseSubscriber) Service() *Service { return s.service }
-func (s *baseSubscriber) SetService(svc *Service) { s.service = svc }
-func (s *baseSubscriber) Err() error { return s.err }
-func (s *baseSubscriber) SetError(err error) { s.err = err }
-func (s *baseSubscriber) Close() {}
-func (s *baseSubscriber) OnAccept() {}
+func (s *baseSubscriber) ID() string { return s.id }
+func (s *baseSubscriber) Subject() string { return s.subject }
+func (s *baseSubscriber) Group() string { return s.group }
+func (s *baseSubscriber) Type() Type { return s.nType }
+func (s *baseSubscriber) Bus() *BusService { return s.service }
+func (s *baseSubscriber) SetBus(svc *BusService) { s.service = svc }
+func (s *baseSubscriber) Err() error { return s.err }
+func (s *baseSubscriber) SetError(err error) { s.err = err }
+func (s *baseSubscriber) Close() {}
+func (s *baseSubscriber) OnAccept() {}
func (s *baseSubscriber) OnMessage(job Event) {
s.SetError(errors.New("do not call base notifier OnMessage method"))
}
diff --git a/pkg/notify/notification_healthchecker.go
b/pkg/event/subscriber_checker.go
similarity index 65%
rename from pkg/notify/notification_healthchecker.go
rename to pkg/event/subscriber_checker.go
index 2466d8c..28c4cb6 100644
--- a/pkg/notify/notification_healthchecker.go
+++ b/pkg/event/subscriber_checker.go
@@ -15,30 +15,30 @@
* limitations under the License.
*/
-package notify
+package event
import "github.com/apache/servicecomb-service-center/pkg/log"
const (
- ServerCheckerName = "__HealthChecker__"
- ServerCheckSubject = "__NotifyServerHealthCheck__"
+ CheckerGroup = "__HealthChecker__"
+ CheckerSubject = "__NotifyServerHealthCheck__"
)
-//Notifier 健康检查
-type ServiceHealthChecker struct {
+// SubscriberHealthChecker check subscriber health and remove it from bus if
return Err
+type SubscriberHealthChecker struct {
Subscriber
}
-type ServiceHealthCheckJob struct {
+type UnhealthyEvent struct {
Event
ErrorSubscriber Subscriber
}
-func (s *ServiceHealthChecker) OnMessage(job Event) {
- j := job.(*ServiceHealthCheckJob)
+func (s *SubscriberHealthChecker) OnMessage(evt Event) {
+ j := evt.(*UnhealthyEvent)
err := j.ErrorSubscriber.Err()
- if j.ErrorSubscriber.Type() == NOTIFTY {
+ if j.ErrorSubscriber.Type() == INNER {
log.Errorf(nil, "remove %s watcher failed, here cause a dead
lock, subject: %s, group: %s",
j.ErrorSubscriber.Type(), j.ErrorSubscriber.Subject(),
j.ErrorSubscriber.Group())
return
@@ -46,18 +46,18 @@ func (s *ServiceHealthChecker) OnMessage(job Event) {
log.Debugf("notification service remove %s watcher, error: %v, subject:
%s, group: %s",
j.ErrorSubscriber.Type(), err, j.ErrorSubscriber.Subject(),
j.ErrorSubscriber.Group())
- s.Service().RemoveSubscriber(j.ErrorSubscriber)
+ s.Bus().RemoveSubscriber(j.ErrorSubscriber)
}
-func NewNotifyServiceHealthChecker() *ServiceHealthChecker {
- return &ServiceHealthChecker{
- Subscriber: NewSubscriber(NOTIFTY, ServerCheckSubject,
ServerCheckerName),
+func NewSubscriberHealthChecker() *SubscriberHealthChecker {
+ return &SubscriberHealthChecker{
+ Subscriber: NewSubscriber(INNER, CheckerSubject, CheckerGroup),
}
}
-func NewNotifyServiceHealthCheckJob(s Subscriber) *ServiceHealthCheckJob {
- return &ServiceHealthCheckJob{
- Event: NewEvent(NOTIFTY, ServerCheckSubject,
ServerCheckerName),
+func NewUnhealthyEvent(s Subscriber) *UnhealthyEvent {
+ return &UnhealthyEvent{
+ Event: NewEvent(INNER, CheckerSubject, CheckerGroup),
ErrorSubscriber: s,
}
}
diff --git a/pkg/notify/group.go b/pkg/event/subscriber_group.go
similarity index 65%
rename from pkg/notify/group.go
rename to pkg/event/subscriber_group.go
index c600bba..0c41f4a 100644
--- a/pkg/notify/group.go
+++ b/pkg/event/subscriber_group.go
@@ -15,51 +15,51 @@
* limitations under the License.
*/
-package notify
+package event
import (
"github.com/apache/servicecomb-service-center/pkg/util"
)
type Group struct {
- name string
- subscribers *util.ConcurrentMap
+ name string
+ members *util.ConcurrentMap
}
func (g *Group) Name() string {
return g.name
}
-func (g *Group) Notify(job Event) {
- g.subscribers.ForEach(func(item util.MapItem) (next bool) {
- item.Value.(Subscriber).OnMessage(job)
- return true
- })
-}
-
-func (g *Group) Subscribers(name string) Subscriber {
- s, ok := g.subscribers.Get(name)
+func (g *Group) Member(name string) Subscriber {
+ s, ok := g.members.Get(name)
if !ok {
return nil
}
return s.(Subscriber)
}
-func (g *Group) AddSubscriber(subscriber Subscriber) Subscriber {
- return g.subscribers.PutIfAbsent(subscriber.ID(),
subscriber).(Subscriber)
+func (g *Group) ForEach(iter func(m Subscriber)) {
+ g.members.ForEach(func(item util.MapItem) (next bool) {
+ iter(item.Value.(Subscriber))
+ return true
+ })
+}
+
+func (g *Group) AddMember(subscriber Subscriber) Subscriber {
+ return g.members.PutIfAbsent(subscriber.ID(), subscriber).(Subscriber)
}
-func (g *Group) Remove(name string) {
- g.subscribers.Remove(name)
+func (g *Group) RemoveMember(name string) {
+ g.members.Remove(name)
}
func (g *Group) Size() int {
- return g.subscribers.Size()
+ return g.members.Size()
}
func NewGroup(name string) *Group {
return &Group{
- name: name,
- subscribers: util.NewConcurrentMap(0),
+ name: name,
+ members: util.NewConcurrentMap(0),
}
}
diff --git a/pkg/notify/group_test.go b/pkg/event/subscriber_group_test.go
similarity index 79%
rename from pkg/notify/group_test.go
rename to pkg/event/subscriber_group_test.go
index 81040a7..b0dc22d 100644
--- a/pkg/notify/group_test.go
+++ b/pkg/event/subscriber_group_test.go
@@ -14,9 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package notify
+package event
-import "testing"
+import (
+ "testing"
+)
type mockSubscriber struct {
Subscriber
@@ -34,37 +36,32 @@ func TestGroup_Add(t *testing.T) {
if g.Name() != "g1" {
t.Fatalf("TestGroup_Add failed")
}
- if g.AddSubscriber(m) != m {
+ if g.AddMember(m) != m {
t.Fatalf("TestGroup_Add failed")
}
- if g.AddSubscriber(NewSubscriber(INSTANCE, "s1", "g1")) == m {
+ if g.AddMember(NewSubscriber(INSTANCE, "s1", "g1")) == m {
t.Fatalf("TestGroup_Add failed")
}
same := *(m.(*baseSubscriber))
- if g.AddSubscriber(&same) != m {
+ if g.AddMember(&same) != m {
t.Fatalf("TestGroup_Add failed")
}
if g.Size() != 2 {
t.Fatalf("TestGroup_Add failed")
}
- g.Remove(m.ID())
+ g.RemoveMember(m.ID())
if g.Size() != 1 {
t.Fatalf("TestGroup_Add failed")
}
- if g.Subscribers(m.ID()) == m {
+ if g.Member(m.ID()) == m {
t.Fatalf("TestGroup_Add failed")
}
mock := &mockSubscriber{Subscriber: NewSubscriber(INSTANCE, "s1", "g1")}
- if g.AddSubscriber(mock) != mock {
+ if g.AddMember(mock) != mock {
t.Fatalf("TestGroup_Add failed")
}
- if g.Subscribers(mock.ID()) != mock {
- t.Fatalf("TestGroup_Add failed")
- }
- job := &baseEvent{nType: INSTANCE}
- g.Notify(job)
- if mock.job != job {
+ if g.Member(mock.ID()) != mock {
t.Fatalf("TestGroup_Add failed")
}
}
diff --git a/pkg/notify/types.go b/pkg/event/types.go
similarity index 96%
rename from pkg/notify/types.go
rename to pkg/event/types.go
index d6cba5d..d42377b 100644
--- a/pkg/notify/types.go
+++ b/pkg/event/types.go
@@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package notify
+package event
import "strconv"
@@ -41,11 +41,11 @@ func (nt Type) IsValid() bool {
}
var typeNames = []string{
- NOTIFTY: "NOTIFTY",
+ INNER: "INNER",
}
var typeQueues = []int{
- NOTIFTY: 0,
+ INNER: 0,
}
func Types() (ts []Type) {
diff --git a/pkg/notify/types_test.go b/pkg/event/types_test.go
similarity index 93%
rename from pkg/notify/types_test.go
rename to pkg/event/types_test.go
index 720af81..35e5a66 100644
--- a/pkg/notify/types_test.go
+++ b/pkg/event/types_test.go
@@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package notify
+package event
import "testing"
@@ -30,7 +30,7 @@ func TestRegisterType(t *testing.T) {
if id.String() != "Type999" || id.QueueSize() != DefaultQueueSize {
t.Fatal("TestRegisterType failed", id.String(), id.QueueSize())
}
- if NOTIFTY.String() != "NOTIFTY" || NOTIFTY.QueueSize() !=
DefaultQueueSize {
+ if INNER.String() != "INNER" || INNER.QueueSize() != DefaultQueueSize {
t.Fatal("TestRegisterType failed", id.String(), id.QueueSize())
}
}
diff --git a/pkg/queue/taskqueue.go b/pkg/queue/taskqueue.go
index e59cec5..b394802 100644
--- a/pkg/queue/taskqueue.go
+++ b/pkg/queue/taskqueue.go
@@ -29,7 +29,7 @@ type Worker interface {
}
type Task struct {
- Object interface{}
+ Payload interface{}
// Async can let workers handle this task concurrently, but
// it will make this task unordered
Async bool
@@ -61,13 +61,13 @@ func (q *TaskQueue) Do(ctx context.Context, task Task) {
if task.Async {
for _, w := range q.Workers {
q.goroutine.Do(func(ctx context.Context) {
- q.dispatch(ctx, w, task.Object)
+ q.dispatch(ctx, w, task.Payload)
})
}
return
}
for _, w := range q.Workers {
- q.dispatch(ctx, w, task.Object)
+ q.dispatch(ctx, w, task.Payload)
}
}
diff --git a/pkg/queue/taskqueue_test.go b/pkg/queue/taskqueue_test.go
index 8c79da4..d0eecd6 100644
--- a/pkg/queue/taskqueue_test.go
+++ b/pkg/queue/taskqueue_test.go
@@ -34,26 +34,26 @@ func TestNewEventQueue(t *testing.T) {
q := NewTaskQueue(0)
q.AddWorker(h)
- q.Do(context.Background(), Task{Object: 1})
+ q.Do(context.Background(), Task{Payload: 1})
if <-h.Object != 1 {
t.Fatalf("TestNewEventQueue failed")
}
- q.Do(context.Background(), Task{Object: 11, Async: true})
+ q.Do(context.Background(), Task{Payload: 11, Async: true})
if <-h.Object != 11 {
t.Fatalf("TestNewEventQueue failed")
}
q.Run()
- q.Add(Task{Object: 2})
+ q.Add(Task{Payload: 2})
if <-h.Object != 2 {
t.Fatalf("TestNewEventQueue failed")
}
- q.Add(Task{Object: 22, Async: true})
+ q.Add(Task{Payload: 22, Async: true})
if <-h.Object != 22 {
t.Fatalf("TestNewEventQueue failed")
}
q.Stop()
- q.Add(Task{Object: 3})
+ q.Add(Task{Payload: 3})
}
diff --git a/pkg/notify/common.go b/server/alarm/center.go
similarity index 88%
copy from pkg/notify/common.go
copy to server/alarm/center.go
index c98e31c..9af6aa8 100644
--- a/pkg/notify/common.go
+++ b/server/alarm/center.go
@@ -15,12 +15,11 @@
* limitations under the License.
*/
-package notify
+package alarm
-const (
- DefaultQueueSize = 1000
-)
-
-const (
- NOTIFTY Type = iota
-)
+func Center() *Service {
+ once.Do(func() {
+ service = NewAlarmService()
+ })
+ return service
+}
diff --git a/server/alarm/common.go b/server/alarm/common.go
index 9243d13..49f8fd0 100644
--- a/server/alarm/common.go
+++ b/server/alarm/common.go
@@ -17,7 +17,7 @@ package alarm
import (
"fmt"
- "github.com/apache/servicecomb-service-center/pkg/notify"
+ "github.com/apache/servicecomb-service-center/pkg/event"
"github.com/apache/servicecomb-service-center/server/alarm/model"
)
@@ -40,7 +40,7 @@ const (
Group = "__ALARM_GROUP__"
)
-var ALARM = notify.RegisterType("ALARM", 0)
+var ALARM = event.RegisterType("ALARM", 0)
func FieldBool(key string, v bool) model.Field {
return model.Field{Key: key, Value: v}
diff --git a/server/alarm/model/types.go b/server/alarm/model/types.go
index 0c29d79..40d0a44 100644
--- a/server/alarm/model/types.go
+++ b/server/alarm/model/types.go
@@ -16,7 +16,7 @@
package model
import (
- nf "github.com/apache/servicecomb-service-center/pkg/notify"
+ nf "github.com/apache/servicecomb-service-center/pkg/event"
"github.com/apache/servicecomb-service-center/pkg/util"
)
diff --git a/server/alarm/service.go b/server/alarm/service.go
index e399b91..3ef5e07 100644
--- a/server/alarm/service.go
+++ b/server/alarm/service.go
@@ -16,11 +16,11 @@
package alarm
import (
+ nf "github.com/apache/servicecomb-service-center/pkg/event"
"github.com/apache/servicecomb-service-center/pkg/log"
- nf "github.com/apache/servicecomb-service-center/pkg/notify"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/alarm/model"
- "github.com/apache/servicecomb-service-center/server/notify"
+ "github.com/apache/servicecomb-service-center/server/event"
"sync"
)
@@ -44,7 +44,7 @@ func (ac *Service) Raise(id model.ID, fields ...model.Field)
error {
for _, f := range fields {
ae.Fields[f.Key] = f.Value
}
- return notify.GetNotifyCenter().Publish(ae)
+ return event.Center().Fire(ae)
}
func (ac *Service) Clear(id model.ID) error {
@@ -53,7 +53,7 @@ func (ac *Service) Clear(id model.ID) error {
Status: Cleared,
ID: id,
}
- return notify.GetNotifyCenter().Publish(ae)
+ return event.Center().Fire(ae)
}
func (ac *Service) ListAll() (ls []*model.AlarmEvent) {
@@ -88,16 +88,9 @@ func NewAlarmService() *Service {
c := &Service{
Subscriber: nf.NewSubscriber(ALARM, Subject, Group),
}
- err := notify.GetNotifyCenter().AddSubscriber(c)
+ err := event.Center().AddSubscriber(c)
if err != nil {
log.Error("", err)
}
return c
}
-
-func Center() *Service {
- once.Do(func() {
- service = NewAlarmService()
- })
- return service
-}
diff --git a/pkg/notify/common.go b/server/connection/connection.go
similarity index 75%
rename from pkg/notify/common.go
rename to server/connection/connection.go
index c98e31c..c34005f 100644
--- a/pkg/notify/common.go
+++ b/server/connection/connection.go
@@ -15,12 +15,14 @@
* limitations under the License.
*/
-package notify
+// connection pkg impl the pub/sub mechanism of the long connection of diff
protocols
+package connection
-const (
- DefaultQueueSize = 1000
-)
+import "time"
const (
- NOTIFTY Type = iota
+ HeartbeatInterval = 30 * time.Second
+ ReadTimeout = HeartbeatInterval * 4
+ SendTimeout = 5 * time.Second
+ ReadMaxBody = 64
)
diff --git a/server/notify/stream.go b/server/connection/grpc/stream.go
similarity index 56%
rename from server/notify/stream.go
rename to server/connection/grpc/stream.go
index f382d19..23c56a3 100644
--- a/server/notify/stream.go
+++ b/server/connection/grpc/stream.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package notify
+package grpc
import (
"context"
@@ -23,22 +23,23 @@ import (
"github.com/apache/servicecomb-service-center/pkg/log"
pb "github.com/apache/servicecomb-service-center/pkg/registry"
"github.com/apache/servicecomb-service-center/pkg/util"
- apt "github.com/apache/servicecomb-service-center/server/core"
+ "github.com/apache/servicecomb-service-center/server/connection"
"github.com/apache/servicecomb-service-center/server/core/proto"
+ "github.com/apache/servicecomb-service-center/server/event"
"time"
)
-func HandleWatchJob(watcher *InstanceEventListWatcher, stream
proto.ServiceInstanceCtrl_WatchServer) (err error) {
- timer := time.NewTimer(HeartbeatInterval)
+const GRPC = "gRPC"
+
+func Handle(watcher *event.InstanceEventListWatcher, stream
proto.ServiceInstanceCtrl_WatchServer) (err error) {
+ timer := time.NewTimer(connection.HeartbeatInterval)
defer timer.Stop()
for {
select {
case <-stream.Context().Done():
return
case <-timer.C:
- timer.Reset(HeartbeatInterval)
-
- // TODO grpc 长连接心跳?
+ timer.Reset(connection.HeartbeatInterval)
case job := <-watcher.Job:
if job == nil {
err = errors.New("channel is closed")
@@ -46,37 +47,36 @@ func HandleWatchJob(watcher *InstanceEventListWatcher,
stream proto.ServiceInsta
watcher.Subject(), watcher.Group())
return
}
- if job.Response != nil {
- resp := job.Response
- log.Infof("event is coming in, watcher,
subject: %s, group: %s",
- watcher.Subject(), watcher.Group())
+ if job.Response == nil {
+ continue
+ }
+ resp := job.Response
+ log.Infof("event is coming in, watcher, subject: %s,
group: %s",
+ watcher.Subject(), watcher.Group())
- err = stream.Send(resp)
- if job != nil {
- ReportPublishCompleted(job, err)
- }
- if err != nil {
- log.Errorf(err, "send message error,
subject: %s, group: %s",
- watcher.Subject(),
watcher.Group())
- watcher.SetError(err)
- return
- }
- util.ResetTimer(timer, HeartbeatInterval)
+ err = stream.Send(resp)
+ connection.ReportPublishCompleted(job, err)
+ if err != nil {
+ log.Errorf(err, "send message error, subject:
%s, group: %s",
+ watcher.Subject(), watcher.Group())
+ watcher.SetError(err)
+ return
}
+ util.ResetTimer(timer, connection.HeartbeatInterval)
}
}
}
-func DoStreamListAndWatch(ctx context.Context, serviceID string, f func()
([]*pb.WatchInstanceResponse, int64), stream
proto.ServiceInstanceCtrl_WatchServer) (err error) {
+func ListAndWatch(ctx context.Context, serviceID string, f func()
([]*pb.WatchInstanceResponse, int64), stream
proto.ServiceInstanceCtrl_WatchServer) (err error) {
domainProject := util.ParseDomainProject(ctx)
domain := util.ParseDomain(ctx)
- watcher := NewInstanceEventListWatcher(serviceID,
apt.GetInstanceRootKey(domainProject)+"/", f)
- err = GetNotifyCenter().AddSubscriber(watcher)
+ watcher := event.NewInstanceEventListWatcher(serviceID, domainProject,
f)
+ err = event.Center().AddSubscriber(watcher)
if err != nil {
return
}
- ReportSubscriber(domain, GRPC, 1)
- err = HandleWatchJob(watcher, stream)
- ReportSubscriber(domain, GRPC, -1)
+ connection.ReportSubscriber(domain, GRPC, 1)
+ err = Handle(watcher, stream)
+ connection.ReportSubscriber(domain, GRPC, -1)
return
}
diff --git a/server/notify/stream_test.go
b/server/connection/grpc/stream_test.go
similarity index 77%
rename from server/notify/stream_test.go
rename to server/connection/grpc/stream_test.go
index 6d92cb5..b20fa5d 100644
--- a/server/notify/stream_test.go
+++ b/server/connection/grpc/stream_test.go
@@ -14,13 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package notify
+package grpc_test
import (
"context"
"github.com/apache/servicecomb-service-center/pkg/log"
pb "github.com/apache/servicecomb-service-center/pkg/registry"
simple "github.com/apache/servicecomb-service-center/pkg/time"
+ stream
"github.com/apache/servicecomb-service-center/server/connection/grpc"
+ "github.com/apache/servicecomb-service-center/server/event"
"google.golang.org/grpc"
"testing"
"time"
@@ -39,19 +41,19 @@ func (x *grpcWatchServer) Context() context.Context {
}
func TestHandleWatchJob(t *testing.T) {
- w := NewInstanceEventListWatcher("g", "s", nil)
+ w := event.NewInstanceEventListWatcher("g", "s", nil)
w.Job <- nil
- err := HandleWatchJob(w, &grpcWatchServer{})
+ err := stream.Handle(w, &grpcWatchServer{})
if err == nil {
t.Fatalf("TestHandleWatchJob failed")
}
- w.Job <- NewInstanceEventWithTime("g", "s", 1,
simple.FromTime(time.Now()), nil)
+ w.Job <- event.NewInstanceEventWithTime("g", "s", 1,
simple.FromTime(time.Now()), nil)
w.Job <- nil
- HandleWatchJob(w, &grpcWatchServer{})
+ stream.Handle(w, &grpcWatchServer{})
}
func TestDoStreamListAndWatch(t *testing.T) {
defer log.Recover()
- err := DoStreamListAndWatch(context.Background(), "s", nil, nil)
+ err := stream.ListAndWatch(context.Background(), "s", nil, nil)
t.Fatal("TestDoStreamListAndWatch failed", err)
}
diff --git a/server/notify/metrics.go b/server/connection/metrics.go
similarity index 65%
rename from server/notify/metrics.go
rename to server/connection/metrics.go
index 6bde05b..76aa41e 100644
--- a/server/notify/metrics.go
+++ b/server/connection/metrics.go
@@ -1,22 +1,24 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
-package notify
+package connection
import (
- "github.com/apache/servicecomb-service-center/pkg/notify"
+ "github.com/apache/servicecomb-service-center/pkg/event"
"github.com/apache/servicecomb-service-center/server/metric"
"github.com/prometheus/client_golang/prometheus"
"time"
@@ -58,7 +60,7 @@ func init() {
prometheus.MustRegister(notifyCounter, notifyLatency, subscriberGauge)
}
-func ReportPublishCompleted(evt notify.Event, err error) {
+func ReportPublishCompleted(evt event.Event, err error) {
instance := metric.InstanceName()
elapsed := float64(time.Since(evt.CreateAt()).Nanoseconds()) /
float64(time.Microsecond)
status := success
diff --git a/server/notify/publisher.go b/server/connection/ws/publisher.go
similarity index 96%
rename from server/notify/publisher.go
rename to server/connection/ws/publisher.go
index aa1c662..a8ee692 100644
--- a/server/notify/publisher.go
+++ b/server/connection/ws/publisher.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package notify
+package ws
import (
"context"
@@ -47,7 +47,7 @@ func (wh *Publisher) Stop() {
func (wh *Publisher) dispatch(ws *WebSocket, payload interface{}) {
wh.goroutine.Do(func(ctx context.Context) {
- ws.HandleWatchWebSocketJob(payload)
+ ws.HandleEvent(payload)
})
}
@@ -102,3 +102,7 @@ func NewPublisher() *Publisher {
goroutine: gopool.New(context.Background()),
}
}
+
+func Instance() *Publisher {
+ return publisher
+}
diff --git a/server/notify/websocket.go b/server/connection/ws/websocket.go
similarity index 84%
rename from server/notify/websocket.go
rename to server/connection/ws/websocket.go
index d31eb8d..d1d01a9 100644
--- a/server/notify/websocket.go
+++ b/server/connection/ws/websocket.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package notify
+package ws
import (
"context"
@@ -24,24 +24,28 @@ import (
"github.com/apache/servicecomb-service-center/pkg/log"
pb "github.com/apache/servicecomb-service-center/pkg/registry"
"github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/connection"
+ "github.com/apache/servicecomb-service-center/server/event"
serviceUtil
"github.com/apache/servicecomb-service-center/server/service/util"
"github.com/gorilla/websocket"
"time"
)
+const Websocket = "Websocket"
+
type WebSocket struct {
ctx context.Context
ticker *time.Ticker
conn *websocket.Conn
// watcher subscribe the notification service event
- watcher *InstanceEventListWatcher
+ watcher *event.InstanceEventListWatcher
needPingWatcher bool
free chan struct{}
closed chan struct{}
}
func (wh *WebSocket) Init() error {
- wh.ticker = time.NewTicker(HeartbeatInterval)
+ wh.ticker = time.NewTicker(connection.HeartbeatInterval)
wh.needPingWatcher = true
wh.free = make(chan struct{}, 1)
wh.closed = make(chan struct{})
@@ -51,7 +55,7 @@ func (wh *WebSocket) Init() error {
remoteAddr := wh.conn.RemoteAddr().String()
// put in notification service queue
- if err := GetNotifyCenter().AddSubscriber(wh.watcher); err != nil {
+ if err := event.Center().AddSubscriber(wh.watcher); err != nil {
err = fmt.Errorf("establish[%s] websocket watch failed: notify
service error, %s",
remoteAddr, err.Error())
log.Errorf(nil, err.Error())
@@ -64,7 +68,7 @@ func (wh *WebSocket) Init() error {
}
// put in publisher queue
- publisher.Accept(wh)
+ Instance().Accept(wh)
log.Debugf("start watching instance status, watcher[%s], subject: %s,
group: %s",
remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
@@ -72,14 +76,14 @@ func (wh *WebSocket) Init() error {
}
func (wh *WebSocket) ReadTimeout() time.Duration {
- return ReadTimeout
+ return connection.ReadTimeout
}
func (wh *WebSocket) SendTimeout() time.Duration {
- return SendTimeout
+ return connection.SendTimeout
}
-func (wh *WebSocket) heartbeat(messageType int) error {
+func (wh *WebSocket) Heartbeat(messageType int) error {
err := wh.conn.WriteControl(messageType, []byte{},
time.Now().Add(wh.SendTimeout()))
if err != nil {
messageTypeName := "Ping"
@@ -94,7 +98,7 @@ func (wh *WebSocket) heartbeat(messageType int) error {
return nil
}
-func (wh *WebSocket) HandleWatchWebSocketControlMessage() {
+func (wh *WebSocket) HandleControlMessage() {
remoteAddr := wh.conn.RemoteAddr().String()
// PING
wh.conn.SetPingHandler(func(message string) error {
@@ -109,7 +113,7 @@ func (wh *WebSocket) HandleWatchWebSocketControlMessage() {
message, remoteAddr, wh.watcher.Subject(),
wh.watcher.Group())
}
wh.needPingWatcher = false
- return wh.heartbeat(websocket.PongMessage)
+ return wh.Heartbeat(websocket.PongMessage)
})
// PONG
wh.conn.SetPongHandler(func(message string) error {
@@ -130,7 +134,7 @@ func (wh *WebSocket) HandleWatchWebSocketControlMessage() {
return wh.sendClose(code, text)
})
- wh.conn.SetReadLimit(ReadMaxBody)
+ wh.conn.SetReadLimit(connection.ReadMaxBody)
err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
if err != nil {
log.Error("", err)
@@ -186,12 +190,11 @@ func (wh *WebSocket) Pick() interface{} {
return nil
}
-// HandleWatchWebSocketJob will be called if Pick() returns not nil
-func (wh *WebSocket) HandleWatchWebSocketJob(o interface{}) {
+// HandleEvent will be called if Pick() returns not nil
+func (wh *WebSocket) HandleEvent(o interface{}) {
defer wh.SetReady()
var (
- job *InstanceEvent
message []byte
remoteAddr = wh.conn.RemoteAddr().String()
)
@@ -213,7 +216,7 @@ func (wh *WebSocket) HandleWatchWebSocketJob(o interface{})
{
return
}
- if err := wh.heartbeat(websocket.PingMessage); err != nil {
+ if err := wh.Heartbeat(websocket.PingMessage); err != nil {
log.Errorf(err, "send 'Ping' message to watcher[%s]
failed, subject: %s, group: %s",
remoteAddr, wh.watcher.Subject(),
wh.watcher.Group())
return
@@ -222,7 +225,7 @@ func (wh *WebSocket) HandleWatchWebSocketJob(o interface{})
{
log.Debugf("send 'Ping' message to watcher[%s], subject: %s,
group: %s",
remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
return
- case *InstanceEvent:
+ case *event.InstanceEvent:
resp := o.Response
providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId,
resp.Key.ServiceName, resp.Key.Version)
@@ -254,8 +257,8 @@ func (wh *WebSocket) HandleWatchWebSocketJob(o interface{})
{
}
err := wh.WriteMessage(message)
- if job != nil {
- ReportPublishCompleted(job, err)
+ if evt, ok := o.(*event.InstanceEvent); ok {
+ connection.ReportPublishCompleted(evt, err)
}
if err != nil {
log.Errorf(err, "watcher[%s] catch an err, subject: %s, group:
%s",
@@ -286,18 +289,14 @@ func (wh *WebSocket) Stop() {
close(wh.closed)
}
-func DoWebSocketListAndWatch(ctx context.Context, serviceID string, f func()
([]*pb.WatchInstanceResponse, int64), conn *websocket.Conn) {
+func ListAndWatch(ctx context.Context, serviceID string, f func()
([]*pb.WatchInstanceResponse, int64), conn *websocket.Conn) {
domainProject := util.ParseDomainProject(ctx)
domain := util.ParseDomain(ctx)
- socket := &WebSocket{
- ctx: ctx,
- conn: conn,
- watcher: NewInstanceEventListWatcher(serviceID, domainProject,
f),
- }
+ socket := New(ctx, conn, event.NewInstanceEventListWatcher(serviceID,
domainProject, f))
- ReportSubscriber(domain, Websocket, 1)
+ connection.ReportSubscriber(domain, Websocket, 1)
process(socket)
- ReportSubscriber(domain, Websocket, -1)
+ connection.ReportSubscriber(domain, Websocket, -1)
}
func process(socket *WebSocket) {
@@ -305,15 +304,23 @@ func process(socket *WebSocket) {
return
}
- socket.HandleWatchWebSocketControlMessage()
+ socket.HandleControlMessage()
socket.Stop()
}
-func EstablishWebSocketError(conn *websocket.Conn, err error) {
+func SendEstablishError(conn *websocket.Conn, err error) {
remoteAddr := conn.RemoteAddr().String()
log.Errorf(err, "establish[%s] websocket watch failed.", remoteAddr)
if err := conn.WriteMessage(websocket.TextMessage,
util.StringToBytesWithNoCopy(err.Error())); err != nil {
log.Errorf(err, "establish[%s] websocket watch failed: write
message failed.", remoteAddr)
}
}
+
+func New(ctx context.Context, conn *websocket.Conn, watcher
*event.InstanceEventListWatcher) *WebSocket {
+ return &WebSocket{
+ ctx: ctx,
+ conn: conn,
+ watcher: watcher,
+ }
+}
diff --git a/server/notify/websocket_test.go
b/server/connection/ws/websocket_test.go
similarity index 71%
rename from server/notify/websocket_test.go
rename to server/connection/ws/websocket_test.go
index d9059f9..b7895eb 100644
--- a/server/notify/websocket_test.go
+++ b/server/connection/ws/websocket_test.go
@@ -14,16 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package notify
+package ws_test
+// initialize
+import _ "github.com/apache/servicecomb-service-center/test"
import (
"context"
"errors"
"github.com/apache/servicecomb-service-center/pkg/registry"
+ wss "github.com/apache/servicecomb-service-center/server/connection/ws"
"github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/core/proto"
- _
"github.com/apache/servicecomb-service-center/server/plugin/discovery/etcd"
- _
"github.com/apache/servicecomb-service-center/server/plugin/registry/buildin"
+ "github.com/apache/servicecomb-service-center/server/event"
"github.com/gorilla/websocket"
"net/http"
"net/http/httptest"
@@ -64,9 +66,9 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
conn, _, _ := websocket.DefaultDialer.Dial(
strings.Replace(s.URL, "http://", "ws://", 1), nil)
- EstablishWebSocketError(conn, errors.New("error"))
+ wss.SendEstablishError(conn, errors.New("error"))
- w := NewInstanceEventListWatcher("g", "s", func() (results
[]*registry.WatchInstanceResponse, rev int64) {
+ w := event.NewInstanceEventListWatcher("g", "s", func() (results
[]*registry.WatchInstanceResponse, rev int64) {
results = append(results, ®istry.WatchInstanceResponse{
Response: proto.CreateResponse(proto.Response_SUCCESS,
"ok"),
Action: string(registry.EVT_CREATE),
@@ -76,41 +78,33 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
return
})
- ws := &WebSocket{
- ctx: context.Background(),
- conn: conn,
- watcher: w,
- }
+ ws := wss.New(context.Background(), conn, w)
err := ws.Init()
if err != nil {
t.Fatalf("TestPublisher_Run")
}
- GetNotifyCenter().Start()
+ event.Center().Start()
go func() {
- DoWebSocketListAndWatch(context.Background(), "", nil, conn)
+ wss.ListAndWatch(context.Background(), "", nil, conn)
- w2 := NewInstanceEventListWatcher("g", "s", func() (results
[]*registry.WatchInstanceResponse, rev int64) {
+ w2 := event.NewInstanceEventListWatcher("g", "s", func()
(results []*registry.WatchInstanceResponse, rev int64) {
return
})
- ws2 := &WebSocket{
- ctx: context.Background(),
- conn: conn,
- watcher: w2,
- }
+ ws2 := wss.New(context.Background(), conn, w2)
err := ws2.Init()
if err != nil {
t.Fatalf("TestPublisher_Run")
}
}()
- go ws.HandleWatchWebSocketControlMessage()
+ go ws.HandleControlMessage()
w.OnMessage(nil)
- w.OnMessage(&InstanceEvent{})
+ w.OnMessage(&event.InstanceEvent{})
- GetNotifyCenter().Publish(NewInstanceEvent("g", "s", 1,
®istry.WatchInstanceResponse{
+ event.Center().Fire(event.NewInstanceEvent("g", "s", 1,
®istry.WatchInstanceResponse{
Response: proto.CreateResponse(proto.Response_SUCCESS, "ok"),
Action: string(registry.EVT_CREATE),
Key: ®istry.MicroServiceKey{},
@@ -119,21 +113,21 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
<-time.After(time.Second)
- ws.HandleWatchWebSocketJob(nil)
+ ws.HandleEvent(nil)
- ws.heartbeat(websocket.PingMessage)
- ws.heartbeat(websocket.PongMessage)
+ ws.Heartbeat(websocket.PingMessage)
+ ws.Heartbeat(websocket.PongMessage)
- ws.HandleWatchWebSocketJob(time.Now())
+ ws.HandleEvent(time.Now())
closeCh <- struct{}{}
<-time.After(time.Second)
- ws.heartbeat(websocket.PingMessage)
- ws.heartbeat(websocket.PongMessage)
+ ws.Heartbeat(websocket.PingMessage)
+ ws.Heartbeat(websocket.PongMessage)
w.OnMessage(nil)
- publisher.Stop()
+ wss.Instance().Stop()
}
diff --git a/server/notify/center.go b/server/event/center.go
similarity index 69%
rename from server/notify/center.go
rename to server/event/center.go
index 4a0ad80..732a762 100644
--- a/server/notify/center.go
+++ b/server/event/center.go
@@ -13,19 +13,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package notify
+package event
import (
- "github.com/apache/servicecomb-service-center/pkg/notify"
+ "github.com/apache/servicecomb-service-center/pkg/event"
)
-var INSTANCE = notify.RegisterType("INSTANCE", InstanceEventQueueSize)
-var notifyService *notify.Service
+var busService *event.BusService
func init() {
- notifyService = notify.NewNotifyService()
+ busService = event.NewBusService()
}
-func GetNotifyCenter() *notify.Service {
- return notifyService
+// Center handle diff types of events
+// event type can be 'ALARM'(biz alarms), 'RESOURCE'(resource changes, like
INSTANCE) or
+// inner type 'NOTIFY'(subscriber health check)
+func Center() *event.BusService {
+ return busService
}
diff --git a/server/notify/listwatcher.go b/server/event/instance_subscriber.go
similarity index 83%
rename from server/notify/listwatcher.go
rename to server/event/instance_subscriber.go
index 6de175f..e13d0e9 100644
--- a/server/notify/listwatcher.go
+++ b/server/event/instance_subscriber.go
@@ -15,27 +15,34 @@
* limitations under the License.
*/
-package notify
+package event
import (
"context"
+ "github.com/apache/servicecomb-service-center/pkg/event"
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/pkg/notify"
pb "github.com/apache/servicecomb-service-center/pkg/registry"
simple "github.com/apache/servicecomb-service-center/pkg/time"
"time"
)
+const (
+ AddJobTimeout = 1 * time.Second
+ EventQueueSize = 5000
+)
+
+var INSTANCE = event.RegisterType("INSTANCE", EventQueueSize)
+
// 状态变化推送
type InstanceEvent struct {
- notify.Event
+ event.Event
Revision int64
Response *pb.WatchInstanceResponse
}
type InstanceEventListWatcher struct {
- notify.Subscriber
+ event.Subscriber
Job chan *InstanceEvent
ListRevision int64
ListFunc func() (results []*pb.WatchInstanceResponse, rev int64)
@@ -45,7 +52,7 @@ type InstanceEventListWatcher struct {
func (w *InstanceEventListWatcher) SetError(err error) {
w.Subscriber.SetError(err)
// 触发清理job
- e := w.Service().Publish(notify.NewNotifyServiceHealthCheckJob(w))
+ e := w.Bus().Fire(event.NewUnhealthyEvent(w))
if e != nil {
log.Error("", e)
}
@@ -55,7 +62,7 @@ func (w *InstanceEventListWatcher) OnAccept() {
if w.Err() != nil {
return
}
- log.Debugf("accepted by notify service, %s watcher %s %s", w.Type(),
w.Group(), w.Subject())
+ log.Debugf("accepted by event service, %s watcher %s %s", w.Type(),
w.Group(), w.Subject())
gopool.Go(w.listAndPublishJobs)
}
@@ -72,7 +79,7 @@ func (w *InstanceEventListWatcher) listAndPublishJobs(_
context.Context) {
}
//被通知
-func (w *InstanceEventListWatcher) OnMessage(job notify.Event) {
+func (w *InstanceEventListWatcher) OnMessage(job event.Event) {
if w.Err() != nil {
return
}
@@ -97,7 +104,7 @@ func (w *InstanceEventListWatcher) OnMessage(job
notify.Event) {
}
if wJob.Revision <= w.ListRevision {
- log.Warnf("unexpected notify %s job is coming in, watcher %s
%s, job is %v, current revision is %v",
+ log.Warnf("unexpected event %s job is coming in, watcher %s %s,
job is %v, current revision is %v",
w.Type(), w.Group(), w.Subject(), job, w.ListRevision)
return
}
@@ -131,7 +138,7 @@ func (w *InstanceEventListWatcher) Close() {
func NewInstanceEvent(serviceID, domainProject string, rev int64, response
*pb.WatchInstanceResponse) *InstanceEvent {
return &InstanceEvent{
- Event: notify.NewEvent(INSTANCE, domainProject, serviceID),
+ Event: event.NewEvent(INSTANCE, domainProject, serviceID),
Revision: rev,
Response: response,
}
@@ -139,7 +146,7 @@ func NewInstanceEvent(serviceID, domainProject string, rev
int64, response *pb.W
func NewInstanceEventWithTime(serviceID, domainProject string, rev int64,
createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
return &InstanceEvent{
- Event: notify.NewEventWithTime(INSTANCE, domainProject,
serviceID, createAt),
+ Event: event.NewEventWithTime(INSTANCE, domainProject,
serviceID, createAt),
Revision: rev,
Response: response,
}
@@ -148,7 +155,7 @@ func NewInstanceEventWithTime(serviceID, domainProject
string, rev int64, create
func NewInstanceEventListWatcher(serviceID, domainProject string,
listFunc func() (results []*pb.WatchInstanceResponse, rev int64))
*InstanceEventListWatcher {
watcher := &InstanceEventListWatcher{
- Subscriber: notify.NewSubscriber(INSTANCE, domainProject,
serviceID),
+ Subscriber: event.NewSubscriber(INSTANCE, domainProject,
serviceID),
Job: make(chan *InstanceEvent, INSTANCE.QueueSize()),
ListFunc: listFunc,
listCh: make(chan struct{}),
diff --git a/server/health/health_test.go b/server/health/health_test.go
index afd5786..804df1e 100644
--- a/server/health/health_test.go
+++ b/server/health/health_test.go
@@ -17,13 +17,13 @@ package health
import (
"github.com/apache/servicecomb-service-center/server/alarm"
- "github.com/apache/servicecomb-service-center/server/notify"
+ "github.com/apache/servicecomb-service-center/server/event"
"testing"
"time"
)
func TestDefaultHealthChecker_Healthy(t *testing.T) {
- notify.GetNotifyCenter().Start()
+ event.Center().Start()
// normal case
var hc DefaultHealthChecker
diff --git a/server/notify/common.go b/server/notify/common.go
deleted file mode 100644
index d0db027..0000000
--- a/server/notify/common.go
+++ /dev/null
@@ -1,29 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package notify
-
-import "time"
-
-const (
- AddJobTimeout = 1 * time.Second
- HeartbeatInterval = 30 * time.Second
- ReadTimeout = HeartbeatInterval * 4
- SendTimeout = 5 * time.Second
- InstanceEventQueueSize = 5000
- ReadMaxBody = 64
- Websocket = "Websocket"
- GRPC = "gRPC"
-)
diff --git a/server/plugin/discovery/k8s/adaptor/listwatcher.go
b/server/plugin/discovery/k8s/adaptor/listwatcher.go
index 520d400..02874d0 100644
--- a/server/plugin/discovery/k8s/adaptor/listwatcher.go
+++ b/server/plugin/discovery/k8s/adaptor/listwatcher.go
@@ -59,16 +59,16 @@ func NewListWatcher(t K8sType, lister
cache.SharedIndexInformer, f OnEventFunc)
lw.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
- Queue(t).Add(queue.Task{Object:
K8sEvent{EventType: pb.EVT_CREATE, Object: obj}})
+ Queue(t).Add(queue.Task{Payload:
K8sEvent{EventType: pb.EVT_CREATE, Object: obj}})
},
UpdateFunc: func(old, new interface{}) {
if !reflect.DeepEqual(old, new) {
- Queue(t).Add(queue.Task{Object:
K8sEvent{EventType: pb.EVT_UPDATE, Object: new,
+ Queue(t).Add(queue.Task{Payload:
K8sEvent{EventType: pb.EVT_UPDATE, Object: new,
PrevObject: old}})
}
},
DeleteFunc: func(obj interface{}) {
- Queue(t).Add(queue.Task{Object:
K8sEvent{EventType: pb.EVT_DELETE, Object: obj}})
+ Queue(t).Add(queue.Task{Payload:
K8sEvent{EventType: pb.EVT_DELETE, Object: obj}})
},
})
Queue(t).AddWorker(lw)
diff --git a/server/server.go b/server/server.go
index de22449..c724de0 100644
--- a/server/server.go
+++ b/server/server.go
@@ -28,13 +28,13 @@ import (
"time"
"context"
+ nf "github.com/apache/servicecomb-service-center/pkg/event"
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
- nf "github.com/apache/servicecomb-service-center/pkg/notify"
"github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/core/backend"
+ "github.com/apache/servicecomb-service-center/server/event"
"github.com/apache/servicecomb-service-center/server/mux"
- "github.com/apache/servicecomb-service-center/server/notify"
"github.com/apache/servicecomb-service-center/server/plugin"
serviceUtil
"github.com/apache/servicecomb-service-center/server/service/util"
"github.com/apache/servicecomb-service-center/server/task"
@@ -51,10 +51,10 @@ func Run() {
}
type ServiceCenterServer struct {
- apiService *APIServer
- notifyService *nf.Service
- cacheService *backend.KvStore
- goroutine *gopool.Pool
+ apiService *APIServer
+ eventCenter *nf.BusService
+ cacheService *backend.KvStore
+ goroutine *gopool.Pool
}
func (s *ServiceCenterServer) Run() {
@@ -187,13 +187,13 @@ func (s *ServiceCenterServer) clearNoInstanceServices() {
func (s *ServiceCenterServer) initialize() {
s.cacheService = backend.Store()
s.apiService = GetAPIServer()
- s.notifyService = notify.GetNotifyCenter()
+ s.eventCenter = event.Center()
s.goroutine = gopool.New(context.Background())
}
func (s *ServiceCenterServer) startServices() {
// notifications
- s.notifyService.Start()
+ s.eventCenter.Start()
// load server plugins
plugin.LoadPlugins()
@@ -240,8 +240,8 @@ func (s *ServiceCenterServer) Stop() {
s.apiService.Stop()
}
- if s.notifyService != nil {
- s.notifyService.Stop()
+ if s.eventCenter != nil {
+ s.eventCenter.Stop()
}
if s.cacheService != nil {
diff --git a/server/service/event/instance_event_handler.go
b/server/service/event/instance_event_handler.go
index c355760..0d6010d 100644
--- a/server/service/event/instance_event_handler.go
+++ b/server/service/event/instance_event_handler.go
@@ -24,7 +24,7 @@ import (
apt "github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/core/backend"
"github.com/apache/servicecomb-service-center/server/core/proto"
- "github.com/apache/servicecomb-service-center/server/notify"
+ "github.com/apache/servicecomb-service-center/server/event"
"github.com/apache/servicecomb-service-center/server/plugin/discovery"
"github.com/apache/servicecomb-service-center/server/service/cache"
"github.com/apache/servicecomb-service-center/server/service/metrics"
@@ -82,7 +82,7 @@ func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent)
{
}
}
- if notify.GetNotifyCenter().Closed() {
+ if event.Center().Closed() {
log.Warnf("caught [%s] instance[%s/%s] event, endpoints %v, but
notify service is closed",
action, providerID, providerInstanceID,
instance.Endpoints)
return
@@ -136,10 +136,10 @@ func PublishInstanceEvent(evt discovery.KvEvent,
domainProject string, serviceKe
}
for _, consumerID := range subscribers {
// TODO add超时怎么处理?
- job := notify.NewInstanceEventWithTime(consumerID,
domainProject, evt.Revision, evt.CreateAt, response)
- err := notify.GetNotifyCenter().Publish(job)
+ evt := event.NewInstanceEventWithTime(consumerID,
domainProject, evt.Revision, evt.CreateAt, response)
+ err := event.Center().Fire(evt)
if err != nil {
- log.Errorf(err, "publish job failed")
+ log.Errorf(err, "publish event[%v] into channel
failed", evt)
}
}
}
diff --git a/server/service/event/rule_event_handler.go
b/server/service/event/rule_event_handler.go
index dedf31e..d719800 100644
--- a/server/service/event/rule_event_handler.go
+++ b/server/service/event/rule_event_handler.go
@@ -26,7 +26,7 @@ import (
"github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/core/backend"
"github.com/apache/servicecomb-service-center/server/core/proto"
- "github.com/apache/servicecomb-service-center/server/notify"
+ "github.com/apache/servicecomb-service-center/server/event"
"github.com/apache/servicecomb-service-center/server/plugin/discovery"
serviceUtil
"github.com/apache/servicecomb-service-center/server/service/util"
)
@@ -98,7 +98,7 @@ func (h *RuleEventHandler) OnEvent(evt discovery.KvEvent) {
}
providerID, ruleID, domainProject := core.GetInfoFromRuleKV(evt.KV.Key)
- if notify.GetNotifyCenter().Closed() {
+ if event.Center().Closed() {
log.Warnf("caught [%s] service rule[%s/%s] event, but notify
service is closed",
action, providerID, ruleID)
return
diff --git a/server/service/event/tag_event_handler.go
b/server/service/event/tag_event_handler.go
index b66fed9..7416f2d 100644
--- a/server/service/event/tag_event_handler.go
+++ b/server/service/event/tag_event_handler.go
@@ -26,7 +26,7 @@ import (
"github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/core/backend"
"github.com/apache/servicecomb-service-center/server/core/proto"
- "github.com/apache/servicecomb-service-center/server/notify"
+ "github.com/apache/servicecomb-service-center/server/event"
"github.com/apache/servicecomb-service-center/server/plugin/discovery"
"github.com/apache/servicecomb-service-center/server/service/cache"
serviceUtil
"github.com/apache/servicecomb-service-center/server/service/util"
@@ -112,7 +112,7 @@ func (h *TagEventHandler) OnEvent(evt discovery.KvEvent) {
consumerID, domainProject := core.GetInfoFromTagKV(evt.KV.Key)
- if notify.GetNotifyCenter().Closed() {
+ if event.Center().Closed() {
log.Warnf("caught [%s] service tags[%s/%s] event, but notify
service is closed",
action, consumerID, evt.KV.Value)
return
diff --git a/server/service/watch.go b/server/service/watch.go
index a30b5f2..2163f03 100644
--- a/server/service/watch.go
+++ b/server/service/watch.go
@@ -23,8 +23,9 @@ import (
"github.com/apache/servicecomb-service-center/pkg/log"
pb "github.com/apache/servicecomb-service-center/pkg/registry"
"github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/connection/grpc"
+ "github.com/apache/servicecomb-service-center/server/connection/ws"
"github.com/apache/servicecomb-service-center/server/core/proto"
- "github.com/apache/servicecomb-service-center/server/notify"
serviceUtil
"github.com/apache/servicecomb-service-center/server/service/util"
"github.com/gorilla/websocket"
)
@@ -47,25 +48,25 @@ func (s *InstanceService) Watch(in
*pb.WatchInstanceRequest, stream proto.Servic
return err
}
- return notify.DoStreamListAndWatch(stream.Context(), in.SelfServiceId,
nil, stream)
+ return grpc.ListAndWatch(stream.Context(), in.SelfServiceId, nil,
stream)
}
func (s *InstanceService) WebSocketWatch(ctx context.Context, in
*pb.WatchInstanceRequest, conn *websocket.Conn) {
log.Infof("new a web socket watch with service[%s]", in.SelfServiceId)
if err := s.WatchPreOpera(ctx, in); err != nil {
- notify.EstablishWebSocketError(conn, err)
+ ws.SendEstablishError(conn, err)
return
}
- notify.DoWebSocketListAndWatch(ctx, in.SelfServiceId, nil, conn)
+ ws.ListAndWatch(ctx, in.SelfServiceId, nil, conn)
}
func (s *InstanceService) WebSocketListAndWatch(ctx context.Context, in
*pb.WatchInstanceRequest, conn *websocket.Conn) {
log.Infof("new a web socket list and watch with service[%s]",
in.SelfServiceId)
if err := s.WatchPreOpera(ctx, in); err != nil {
- notify.EstablishWebSocketError(conn, err)
+ ws.SendEstablishError(conn, err)
return
}
- notify.DoWebSocketListAndWatch(ctx, in.SelfServiceId, func()
([]*pb.WatchInstanceResponse, int64) {
+ ws.ListAndWatch(ctx, in.SelfServiceId, func()
([]*pb.WatchInstanceResponse, int64) {
return serviceUtil.QueryAllProvidersInstances(ctx,
in.SelfServiceId)
}, conn)
}