This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch v1.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/v1.x by this push:
new 4bcda71 SCB-2176 Refactor websocket (#980)
4bcda71 is described below
commit 4bcda712b0eb08b106dccf335b4cbdaf1404c6a0
Author: little-cui <[email protected]>
AuthorDate: Mon May 17 11:01:24 2021 +0800
SCB-2176 Refactor websocket (#980)
---
pkg/event/bus.go | 6 +-
pkg/event/subscriber.go | 5 +-
server/connection/grpc/stream.go | 4 +-
server/connection/grpc/stream_test.go | 2 +-
server/connection/ws/common.go | 56 ++++++
.../connection/ws/{publisher.go => keepalive.go} | 28 +--
server/connection/ws/options.go | 36 ++++
server/connection/ws/websocket.go | 196 +++++++++------------
server/connection/ws/websocket_test.go | 14 +-
server/event/instance_event.go | 51 ++++++
server/event/instance_subscriber.go | 53 ++----
11 files changed, 267 insertions(+), 184 deletions(-)
diff --git a/pkg/event/bus.go b/pkg/event/bus.go
index 3735b0a..3c59341 100644
--- a/pkg/event/bus.go
+++ b/pkg/event/bus.go
@@ -40,14 +40,14 @@ func (bus *Bus) Fire(evt Event) {
bus.Add(queue.Task{Payload: evt})
}
-func (bus *Bus) Handle(ctx context.Context, evt interface{}) {
- bus.fireAtOnce(evt.(Event))
+func (bus *Bus) Handle(ctx context.Context, payload interface{}) {
+ bus.fireAtOnce(payload.(Event))
}
func (bus *Bus) fireAtOnce(evt Event) {
if itf, ok := bus.subjects.Get(evt.Subject()); ok {
itf.(*Poster).Post(evt)
- }
+ } // else the evt will be discard
}
func (bus *Bus) Subjects(name string) *Poster {
diff --git a/pkg/event/subscriber.go b/pkg/event/subscriber.go
index 1a3048b..583213e 100644
--- a/pkg/event/subscriber.go
+++ b/pkg/event/subscriber.go
@@ -30,12 +30,15 @@ type Subscriber interface {
Bus() *BusService
SetBus(*BusService)
+ // Err event bus remove subscriber automatically, if return not nil.
+ // Implement of OnMessage should call SetError when run exception
Err() error
SetError(err error)
Close()
+ // OnAccept call when subscriber appended in event bus successfully
OnAccept()
- // The event bus will callback this function, so it must be non-blocked.
+ // OnMessage call when event bus fire a msg, it must be non-blocked
OnMessage(Event)
}
diff --git a/server/connection/grpc/stream.go b/server/connection/grpc/stream.go
index 23c56a3..0609bdb 100644
--- a/server/connection/grpc/stream.go
+++ b/server/connection/grpc/stream.go
@@ -31,7 +31,7 @@ import (
const GRPC = "gRPC"
-func Handle(watcher *event.InstanceEventListWatcher, stream
proto.ServiceInstanceCtrl_WatchServer) (err error) {
+func Handle(watcher *event.InstanceSubscriber, stream
proto.ServiceInstanceCtrl_WatchServer) (err error) {
timer := time.NewTimer(connection.HeartbeatInterval)
defer timer.Stop()
for {
@@ -70,7 +70,7 @@ func Handle(watcher *event.InstanceEventListWatcher, stream
proto.ServiceInstanc
func ListAndWatch(ctx context.Context, serviceID string, f func()
([]*pb.WatchInstanceResponse, int64), stream
proto.ServiceInstanceCtrl_WatchServer) (err error) {
domainProject := util.ParseDomainProject(ctx)
domain := util.ParseDomain(ctx)
- watcher := event.NewInstanceEventListWatcher(serviceID, domainProject,
f)
+ watcher := event.NewInstanceSubscriber(serviceID, domainProject, f)
err = event.Center().AddSubscriber(watcher)
if err != nil {
return
diff --git a/server/connection/grpc/stream_test.go
b/server/connection/grpc/stream_test.go
index b20fa5d..6ee8eb3 100644
--- a/server/connection/grpc/stream_test.go
+++ b/server/connection/grpc/stream_test.go
@@ -41,7 +41,7 @@ func (x *grpcWatchServer) Context() context.Context {
}
func TestHandleWatchJob(t *testing.T) {
- w := event.NewInstanceEventListWatcher("g", "s", nil)
+ w := event.NewInstanceSubscriber("g", "s", nil)
w.Job <- nil
err := stream.Handle(w, &grpcWatchServer{})
if err == nil {
diff --git a/server/connection/ws/common.go b/server/connection/ws/common.go
new file mode 100644
index 0000000..15a060d
--- /dev/null
+++ b/server/connection/ws/common.go
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ws
+
+import (
+ "context"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/registry"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/connection"
+ "github.com/apache/servicecomb-service-center/server/event"
+ "github.com/gorilla/websocket"
+)
+
+func ListAndWatch(ctx context.Context, serviceID string, f func()
([]*registry.WatchInstanceResponse, int64), conn *websocket.Conn) {
+ domainProject := util.ParseDomainProject(ctx)
+ domain := util.ParseDomain(ctx)
+ socket := New(ctx, conn, event.NewInstanceSubscriber(serviceID,
domainProject, f))
+
+ connection.ReportSubscriber(domain, Websocket, 1)
+ process(socket)
+ connection.ReportSubscriber(domain, Websocket, -1)
+}
+
+func process(socket *WebSocket) {
+ if err := socket.Init(); err != nil {
+ return
+ }
+
+ socket.HandleControlMessage()
+
+ socket.Stop()
+}
+
+func SendEstablishError(conn *websocket.Conn, err error) {
+ remoteAddr := conn.RemoteAddr().String()
+ log.Errorf(err, "establish[%s] websocket watch failed.", remoteAddr)
+ if err := conn.WriteMessage(websocket.TextMessage,
util.StringToBytesWithNoCopy(err.Error())); err != nil {
+ log.Errorf(err, "establish[%s] websocket watch failed: write
message failed.", remoteAddr)
+ }
+}
diff --git a/server/connection/ws/publisher.go
b/server/connection/ws/keepalive.go
similarity index 80%
rename from server/connection/ws/publisher.go
rename to server/connection/ws/keepalive.go
index a8ee692..3bd7e8b 100644
--- a/server/connection/ws/publisher.go
+++ b/server/connection/ws/keepalive.go
@@ -24,34 +24,34 @@ import (
"time"
)
-var publisher *Publisher
+var runner *KeepaliveRunner
func init() {
- publisher = NewPublisher()
- publisher.Run()
+ runner = NewRunner()
+ runner.Run()
}
-type Publisher struct {
+type KeepaliveRunner struct {
wss []*WebSocket
lock sync.Mutex
goroutine *gopool.Pool
}
-func (wh *Publisher) Run() {
- gopool.Go(publisher.loop)
+func (wh *KeepaliveRunner) Run() {
+ gopool.Go(runner.loop)
}
-func (wh *Publisher) Stop() {
+func (wh *KeepaliveRunner) Stop() {
wh.goroutine.Close(true)
}
-func (wh *Publisher) dispatch(ws *WebSocket, payload interface{}) {
+func (wh *KeepaliveRunner) dispatch(ws *WebSocket, payload interface{}) {
wh.goroutine.Do(func(ctx context.Context) {
ws.HandleEvent(payload)
})
}
-func (wh *Publisher) loop(ctx context.Context) {
+func (wh *KeepaliveRunner) loop(ctx context.Context) {
defer wh.Stop()
ticker := time.NewTicker(500 * time.Millisecond)
for {
@@ -91,18 +91,18 @@ func (wh *Publisher) loop(ctx context.Context) {
}
}
-func (wh *Publisher) Accept(ws *WebSocket) {
+func (wh *KeepaliveRunner) Accept(ws *WebSocket) {
wh.lock.Lock()
wh.wss = append(wh.wss, ws)
wh.lock.Unlock()
}
-func NewPublisher() *Publisher {
- return &Publisher{
+func NewRunner() *KeepaliveRunner {
+ return &KeepaliveRunner{
goroutine: gopool.New(context.Background()),
}
}
-func Instance() *Publisher {
- return publisher
+func Runner() *KeepaliveRunner {
+ return runner
}
diff --git a/server/connection/ws/options.go b/server/connection/ws/options.go
new file mode 100644
index 0000000..5817ef0
--- /dev/null
+++ b/server/connection/ws/options.go
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ws
+
+import (
+ "time"
+
+ "github.com/apache/servicecomb-service-center/server/connection"
+)
+
+type Options struct {
+ ReadTimeout time.Duration
+ SendTimeout time.Duration
+}
+
+func ToOptions() Options {
+ return Options{
+ ReadTimeout: connection.ReadTimeout,
+ SendTimeout: connection.SendTimeout,
+ }
+}
diff --git a/server/connection/ws/websocket.go
b/server/connection/ws/websocket.go
index d1d01a9..fc6db9c 100644
--- a/server/connection/ws/websocket.go
+++ b/server/connection/ws/websocket.go
@@ -34,11 +34,13 @@ import (
const Websocket = "Websocket"
type WebSocket struct {
+ Options
+
ctx context.Context
ticker *time.Ticker
conn *websocket.Conn
// watcher subscribe the notification service event
- watcher *event.InstanceEventListWatcher
+ watcher *event.InstanceSubscriber
needPingWatcher bool
free chan struct{}
closed chan struct{}
@@ -67,43 +69,20 @@ func (wh *WebSocket) Init() error {
return err
}
- // put in publisher queue
- Instance().Accept(wh)
+ // put in runner queue
+ Runner().Accept(wh)
log.Debugf("start watching instance status, watcher[%s], subject: %s,
group: %s",
remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
return nil
}
-func (wh *WebSocket) ReadTimeout() time.Duration {
- return connection.ReadTimeout
-}
-
-func (wh *WebSocket) SendTimeout() time.Duration {
- return connection.SendTimeout
-}
-
-func (wh *WebSocket) Heartbeat(messageType int) error {
- err := wh.conn.WriteControl(messageType, []byte{},
time.Now().Add(wh.SendTimeout()))
- if err != nil {
- messageTypeName := "Ping"
- if messageType == websocket.PongMessage {
- messageTypeName = "Pong"
- }
- log.Errorf(err, "fail to send '%s' to watcher[%s], subject: %s,
group: %s",
- messageTypeName, wh.conn.RemoteAddr(),
wh.watcher.Subject(), wh.watcher.Group())
- //wh.watcher.SetError(err)
- return err
- }
- return nil
-}
-
func (wh *WebSocket) HandleControlMessage() {
remoteAddr := wh.conn.RemoteAddr().String()
// PING
wh.conn.SetPingHandler(func(message string) error {
defer func() {
- err :=
wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
+ err :=
wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
if err != nil {
log.Error("", err)
}
@@ -113,12 +92,12 @@ func (wh *WebSocket) HandleControlMessage() {
message, remoteAddr, wh.watcher.Subject(),
wh.watcher.Group())
}
wh.needPingWatcher = false
- return wh.Heartbeat(websocket.PongMessage)
+ return wh.WritePingPong(websocket.PongMessage)
})
// PONG
wh.conn.SetPongHandler(func(message string) error {
defer func() {
- err :=
wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
+ err :=
wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
if err != nil {
log.Error("", err)
}
@@ -135,7 +114,7 @@ func (wh *WebSocket) HandleControlMessage() {
})
wh.conn.SetReadLimit(connection.ReadMaxBody)
- err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
+ err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
if err != nil {
log.Error("", err)
}
@@ -155,7 +134,7 @@ func (wh *WebSocket) sendClose(code int, text string) error
{
if code != websocket.CloseNoStatusReceived {
message = websocket.FormatCloseMessage(code, text)
}
- err := wh.conn.WriteControl(websocket.CloseMessage, message,
time.Now().Add(wh.SendTimeout()))
+ err := wh.conn.WriteControl(websocket.CloseMessage, message,
time.Now().Add(wh.SendTimeout))
if err != nil {
log.Errorf(err, "watcher[%s] catch an err, subject: %s, group:
%s",
remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
@@ -164,7 +143,7 @@ func (wh *WebSocket) sendClose(code int, text string) error
{
return nil
}
-// Pick will be called by publisher
+// Pick will be called by runner
func (wh *WebSocket) Pick() interface{} {
select {
case <-wh.Ready():
@@ -194,84 +173,98 @@ func (wh *WebSocket) Pick() interface{} {
func (wh *WebSocket) HandleEvent(o interface{}) {
defer wh.SetReady()
- var (
- message []byte
- remoteAddr = wh.conn.RemoteAddr().String()
- )
-
+ var remoteAddr = wh.conn.RemoteAddr().String()
switch o := o.(type) {
case error:
log.Errorf(o, "watcher[%s] catch an err, subject: %s, group:
%s",
remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-
- message = util.StringToBytesWithNoCopy(fmt.Sprintf("watcher
catch an err: %s", o.Error()))
+ _ = wh.write(util.StringToBytesWithNoCopy(fmt.Sprintf("watcher
catch an err: %s", o.Error())))
case time.Time:
- domainProject := util.ParseDomainProject(wh.ctx)
- if !serviceUtil.ServiceExist(wh.ctx, domainProject,
wh.watcher.Group()) {
- message = util.StringToBytesWithNoCopy("Service does
not exit.")
- break
- }
+ wh.Keepalive()
+ case *event.InstanceEvent:
+ wh.WriteInstanceEvent(o)
+ default:
+ log.Errorf(nil, "watcher[%s] unknown input %v, subject: %s,
group: %s",
+ remoteAddr, o, wh.watcher.Subject(), wh.watcher.Group())
+ }
+}
- if !wh.needPingWatcher {
- return
- }
+func (wh *WebSocket) Keepalive() {
+ domainProject := util.ParseDomainProject(wh.ctx)
+ if !serviceUtil.ServiceExist(wh.ctx, domainProject, wh.watcher.Group())
{
+ _ = wh.write(util.StringToBytesWithNoCopy("Service does not
exit."))
+ return
+ }
- if err := wh.Heartbeat(websocket.PingMessage); err != nil {
- log.Errorf(err, "send 'Ping' message to watcher[%s]
failed, subject: %s, group: %s",
- remoteAddr, wh.watcher.Subject(),
wh.watcher.Group())
- return
- }
+ if !wh.needPingWatcher {
+ return
+ }
- log.Debugf("send 'Ping' message to watcher[%s], subject: %s,
group: %s",
+ remoteAddr := wh.conn.RemoteAddr().String()
+ if err := wh.WritePingPong(websocket.PingMessage); err != nil {
+ log.Errorf(err, "send 'Ping' message to watcher[%s] failed,
subject: %s, group: %s",
remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
return
- case *event.InstanceEvent:
- resp := o.Response
+ }
- providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId,
resp.Key.ServiceName, resp.Key.Version)
- if resp.Action != string(pb.EVT_EXPIRE) {
- providerFlag = fmt.Sprintf("%s/%s(%s)",
resp.Instance.ServiceId, resp.Instance.InstanceId, providerFlag)
- }
- log.Infof("event[%s] is coming in, watcher[%s] watch %s,
subject: %s, group: %s",
- resp.Action, remoteAddr, providerFlag,
wh.watcher.Subject(), wh.watcher.Group())
+ log.Debugf("send 'Ping' message to watcher[%s], subject: %s, group: %s",
+ remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
+ return
+}
- resp.Response = nil
- data, err := json.Marshal(resp)
- if err != nil {
- log.Errorf(err, "watcher[%s] watch %s, subject: %s,
group: %s",
- remoteAddr, providerFlag, o,
wh.watcher.Subject(), wh.watcher.Group())
- message =
util.StringToBytesWithNoCopy(fmt.Sprintf("marshal output file error, %s",
err.Error()))
- break
+func (wh *WebSocket) WritePingPong(messageType int) error {
+ err := wh.conn.WriteControl(messageType, []byte{},
time.Now().Add(wh.SendTimeout))
+ if err != nil {
+ messageTypeName := "Ping"
+ if messageType == websocket.PongMessage {
+ messageTypeName = "Pong"
}
- message = data
- default:
- log.Errorf(nil, "watcher[%s] unknown input %v, subject: %s,
group: %s",
- remoteAddr, o, wh.watcher.Subject(), wh.watcher.Group())
- return
+ log.Errorf(err, "fail to send '%s' to watcher[%s], subject: %s,
group: %s",
+ messageTypeName, wh.conn.RemoteAddr(),
wh.watcher.Subject(), wh.watcher.Group())
+ //wh.watcher.SetError(err)
+ return err
}
+ return nil
+}
- select {
- case <-wh.closed:
- return
- default:
+func (wh *WebSocket) WriteInstanceEvent(evt *event.InstanceEvent) {
+ resp := evt.Response
+ providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId,
resp.Key.ServiceName, resp.Key.Version)
+ if resp.Action != string(pb.EVT_EXPIRE) {
+ providerFlag = fmt.Sprintf("%s/%s(%s)",
resp.Instance.ServiceId, resp.Instance.InstanceId, providerFlag)
}
+ remoteAddr := wh.conn.RemoteAddr().String()
+ log.Infof("event[%s] is coming in, watcher[%s] watch %s, subject: %s,
group: %s",
+ resp.Action, remoteAddr, providerFlag, wh.watcher.Subject(),
wh.watcher.Group())
- err := wh.WriteMessage(message)
- if evt, ok := o.(*event.InstanceEvent); ok {
- connection.ReportPublishCompleted(evt, err)
- }
+ resp.Response = nil
+ data, err := json.Marshal(resp)
if err != nil {
- log.Errorf(err, "watcher[%s] catch an err, subject: %s, group:
%s",
- remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
+ log.Errorf(err, "watcher[%s] watch %s, subject: %s, group: %s",
+ remoteAddr, providerFlag, evt, wh.watcher.Subject(),
wh.watcher.Group())
+ data = util.StringToBytesWithNoCopy(fmt.Sprintf("marshal output
file error, %s", err.Error()))
}
+ err = wh.write(data)
+ connection.ReportPublishCompleted(evt, err)
}
-func (wh *WebSocket) WriteMessage(message []byte) error {
- err := wh.conn.SetWriteDeadline(time.Now().Add(wh.SendTimeout()))
+func (wh *WebSocket) write(message []byte) error {
+ select {
+ case <-wh.closed:
+ return nil
+ default:
+ }
+
+ err := wh.conn.SetWriteDeadline(time.Now().Add(wh.SendTimeout))
if err != nil {
return err
}
- return wh.conn.WriteMessage(websocket.TextMessage, message)
+ err = wh.conn.WriteMessage(websocket.TextMessage, message)
+ if err != nil {
+ log.Errorf(err, "watcher[%s] catch an err, subject: %s, group:
%s",
+ wh.conn.RemoteAddr().String(), wh.watcher.Subject(),
wh.watcher.Group())
+ }
+ return err
}
func (wh *WebSocket) Ready() <-chan struct{} {
@@ -289,36 +282,9 @@ func (wh *WebSocket) Stop() {
close(wh.closed)
}
-func ListAndWatch(ctx context.Context, serviceID string, f func()
([]*pb.WatchInstanceResponse, int64), conn *websocket.Conn) {
- domainProject := util.ParseDomainProject(ctx)
- domain := util.ParseDomain(ctx)
- socket := New(ctx, conn, event.NewInstanceEventListWatcher(serviceID,
domainProject, f))
-
- connection.ReportSubscriber(domain, Websocket, 1)
- process(socket)
- connection.ReportSubscriber(domain, Websocket, -1)
-}
-
-func process(socket *WebSocket) {
- if err := socket.Init(); err != nil {
- return
- }
-
- socket.HandleControlMessage()
-
- socket.Stop()
-}
-
-func SendEstablishError(conn *websocket.Conn, err error) {
- remoteAddr := conn.RemoteAddr().String()
- log.Errorf(err, "establish[%s] websocket watch failed.", remoteAddr)
- if err := conn.WriteMessage(websocket.TextMessage,
util.StringToBytesWithNoCopy(err.Error())); err != nil {
- log.Errorf(err, "establish[%s] websocket watch failed: write
message failed.", remoteAddr)
- }
-}
-
-func New(ctx context.Context, conn *websocket.Conn, watcher
*event.InstanceEventListWatcher) *WebSocket {
+func New(ctx context.Context, conn *websocket.Conn, watcher
*event.InstanceSubscriber) *WebSocket {
return &WebSocket{
+ Options: ToOptions(),
ctx: ctx,
conn: conn,
watcher: watcher,
diff --git a/server/connection/ws/websocket_test.go
b/server/connection/ws/websocket_test.go
index b7895eb..8338b8f 100644
--- a/server/connection/ws/websocket_test.go
+++ b/server/connection/ws/websocket_test.go
@@ -68,7 +68,7 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
wss.SendEstablishError(conn, errors.New("error"))
- w := event.NewInstanceEventListWatcher("g", "s", func() (results
[]*registry.WatchInstanceResponse, rev int64) {
+ w := event.NewInstanceSubscriber("g", "s", func() (results
[]*registry.WatchInstanceResponse, rev int64) {
results = append(results, ®istry.WatchInstanceResponse{
Response: proto.CreateResponse(proto.Response_SUCCESS,
"ok"),
Action: string(registry.EVT_CREATE),
@@ -89,7 +89,7 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
go func() {
wss.ListAndWatch(context.Background(), "", nil, conn)
- w2 := event.NewInstanceEventListWatcher("g", "s", func()
(results []*registry.WatchInstanceResponse, rev int64) {
+ w2 := event.NewInstanceSubscriber("g", "s", func() (results
[]*registry.WatchInstanceResponse, rev int64) {
return
})
ws2 := wss.New(context.Background(), conn, w2)
@@ -115,8 +115,8 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
ws.HandleEvent(nil)
- ws.Heartbeat(websocket.PingMessage)
- ws.Heartbeat(websocket.PongMessage)
+ ws.WritePingPong(websocket.PingMessage)
+ ws.WritePingPong(websocket.PongMessage)
ws.HandleEvent(time.Now())
@@ -124,10 +124,10 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
<-time.After(time.Second)
- ws.Heartbeat(websocket.PingMessage)
- ws.Heartbeat(websocket.PongMessage)
+ ws.WritePingPong(websocket.PingMessage)
+ ws.WritePingPong(websocket.PongMessage)
w.OnMessage(nil)
- wss.Instance().Stop()
+ wss.Runner().Stop()
}
diff --git a/server/event/instance_event.go b/server/event/instance_event.go
new file mode 100644
index 0000000..0098187
--- /dev/null
+++ b/server/event/instance_event.go
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+ "github.com/apache/servicecomb-service-center/pkg/event"
+ pb "github.com/apache/servicecomb-service-center/pkg/registry"
+ simple "github.com/apache/servicecomb-service-center/pkg/time"
+)
+
+const QueueSize = 5000
+
+var INSTANCE = event.RegisterType("INSTANCE", QueueSize)
+
+// 状态变化推送
+type InstanceEvent struct {
+ event.Event
+ Revision int64
+ Response *pb.WatchInstanceResponse
+}
+
+func NewInstanceEvent(serviceID, domainProject string, rev int64, response
*pb.WatchInstanceResponse) *InstanceEvent {
+ return &InstanceEvent{
+ Event: event.NewEvent(INSTANCE, domainProject, serviceID),
+ Revision: rev,
+ Response: response,
+ }
+}
+
+func NewInstanceEventWithTime(serviceID, domainProject string, rev int64,
createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
+ return &InstanceEvent{
+ Event: event.NewEventWithTime(INSTANCE, domainProject,
serviceID, createAt),
+ Revision: rev,
+ Response: response,
+ }
+}
diff --git a/server/event/instance_subscriber.go
b/server/event/instance_subscriber.go
index e13d0e9..c5fad5a 100644
--- a/server/event/instance_subscriber.go
+++ b/server/event/instance_subscriber.go
@@ -23,25 +23,12 @@ import (
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
pb "github.com/apache/servicecomb-service-center/pkg/registry"
- simple "github.com/apache/servicecomb-service-center/pkg/time"
"time"
)
-const (
- AddJobTimeout = 1 * time.Second
- EventQueueSize = 5000
-)
-
-var INSTANCE = event.RegisterType("INSTANCE", EventQueueSize)
+const AddJobTimeout = 1 * time.Second
-// 状态变化推送
-type InstanceEvent struct {
- event.Event
- Revision int64
- Response *pb.WatchInstanceResponse
-}
-
-type InstanceEventListWatcher struct {
+type InstanceSubscriber struct {
event.Subscriber
Job chan *InstanceEvent
ListRevision int64
@@ -49,7 +36,7 @@ type InstanceEventListWatcher struct {
listCh chan struct{}
}
-func (w *InstanceEventListWatcher) SetError(err error) {
+func (w *InstanceSubscriber) SetError(err error) {
w.Subscriber.SetError(err)
// 触发清理job
e := w.Bus().Fire(event.NewUnhealthyEvent(w))
@@ -58,7 +45,7 @@ func (w *InstanceEventListWatcher) SetError(err error) {
}
}
-func (w *InstanceEventListWatcher) OnAccept() {
+func (w *InstanceSubscriber) OnAccept() {
if w.Err() != nil {
return
}
@@ -66,7 +53,7 @@ func (w *InstanceEventListWatcher) OnAccept() {
gopool.Go(w.listAndPublishJobs)
}
-func (w *InstanceEventListWatcher) listAndPublishJobs(_ context.Context) {
+func (w *InstanceSubscriber) listAndPublishJobs(_ context.Context) {
defer close(w.listCh)
if w.ListFunc == nil {
return
@@ -79,7 +66,7 @@ func (w *InstanceEventListWatcher) listAndPublishJobs(_
context.Context) {
}
//被通知
-func (w *InstanceEventListWatcher) OnMessage(job event.Event) {
+func (w *InstanceSubscriber) OnMessage(job event.Event) {
if w.Err() != nil {
return
}
@@ -111,7 +98,7 @@ func (w *InstanceEventListWatcher) OnMessage(job
event.Event) {
w.sendMessage(wJob)
}
-func (w *InstanceEventListWatcher) sendMessage(job *InstanceEvent) {
+func (w *InstanceSubscriber) sendMessage(job *InstanceEvent) {
defer log.Recover()
select {
case w.Job <- job:
@@ -128,33 +115,17 @@ func (w *InstanceEventListWatcher) sendMessage(job
*InstanceEvent) {
}
}
-func (w *InstanceEventListWatcher) Timeout() time.Duration {
+func (w *InstanceSubscriber) Timeout() time.Duration {
return AddJobTimeout
}
-func (w *InstanceEventListWatcher) Close() {
+func (w *InstanceSubscriber) Close() {
close(w.Job)
}
-func NewInstanceEvent(serviceID, domainProject string, rev int64, response
*pb.WatchInstanceResponse) *InstanceEvent {
- return &InstanceEvent{
- Event: event.NewEvent(INSTANCE, domainProject, serviceID),
- Revision: rev,
- Response: response,
- }
-}
-
-func NewInstanceEventWithTime(serviceID, domainProject string, rev int64,
createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
- return &InstanceEvent{
- Event: event.NewEventWithTime(INSTANCE, domainProject,
serviceID, createAt),
- Revision: rev,
- Response: response,
- }
-}
-
-func NewInstanceEventListWatcher(serviceID, domainProject string,
- listFunc func() (results []*pb.WatchInstanceResponse, rev int64))
*InstanceEventListWatcher {
- watcher := &InstanceEventListWatcher{
+func NewInstanceSubscriber(serviceID, domainProject string,
+ listFunc func() (results []*pb.WatchInstanceResponse, rev int64))
*InstanceSubscriber {
+ watcher := &InstanceSubscriber{
Subscriber: event.NewSubscriber(INSTANCE, domainProject,
serviceID),
Job: make(chan *InstanceEvent, INSTANCE.QueueSize()),
ListFunc: listFunc,