This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch v1.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/v1.x by this push:
     new 4bcda71  SCB-2176 Refactor websocket (#980)
4bcda71 is described below

commit 4bcda712b0eb08b106dccf335b4cbdaf1404c6a0
Author: little-cui <[email protected]>
AuthorDate: Mon May 17 11:01:24 2021 +0800

    SCB-2176 Refactor websocket (#980)
---
 pkg/event/bus.go                                   |   6 +-
 pkg/event/subscriber.go                            |   5 +-
 server/connection/grpc/stream.go                   |   4 +-
 server/connection/grpc/stream_test.go              |   2 +-
 server/connection/ws/common.go                     |  56 ++++++
 .../connection/ws/{publisher.go => keepalive.go}   |  28 +--
 server/connection/ws/options.go                    |  36 ++++
 server/connection/ws/websocket.go                  | 196 +++++++++------------
 server/connection/ws/websocket_test.go             |  14 +-
 server/event/instance_event.go                     |  51 ++++++
 server/event/instance_subscriber.go                |  53 ++----
 11 files changed, 267 insertions(+), 184 deletions(-)

diff --git a/pkg/event/bus.go b/pkg/event/bus.go
index 3735b0a..3c59341 100644
--- a/pkg/event/bus.go
+++ b/pkg/event/bus.go
@@ -40,14 +40,14 @@ func (bus *Bus) Fire(evt Event) {
        bus.Add(queue.Task{Payload: evt})
 }
 
-func (bus *Bus) Handle(ctx context.Context, evt interface{}) {
-       bus.fireAtOnce(evt.(Event))
+func (bus *Bus) Handle(ctx context.Context, payload interface{}) {
+       bus.fireAtOnce(payload.(Event))
 }
 
 func (bus *Bus) fireAtOnce(evt Event) {
        if itf, ok := bus.subjects.Get(evt.Subject()); ok {
                itf.(*Poster).Post(evt)
-       }
+       } // else the evt will be discard
 }
 
 func (bus *Bus) Subjects(name string) *Poster {
diff --git a/pkg/event/subscriber.go b/pkg/event/subscriber.go
index 1a3048b..583213e 100644
--- a/pkg/event/subscriber.go
+++ b/pkg/event/subscriber.go
@@ -30,12 +30,15 @@ type Subscriber interface {
        Bus() *BusService
        SetBus(*BusService)
 
+       // Err event bus remove subscriber automatically, if return not nil.
+       // Implement of OnMessage should call SetError when run exception
        Err() error
        SetError(err error)
 
        Close()
+       // OnAccept call when subscriber appended in event bus successfully
        OnAccept()
-       // The event bus will callback this function, so it must be non-blocked.
+       // OnMessage call when event bus fire a msg, it must be non-blocked
        OnMessage(Event)
 }
 
diff --git a/server/connection/grpc/stream.go b/server/connection/grpc/stream.go
index 23c56a3..0609bdb 100644
--- a/server/connection/grpc/stream.go
+++ b/server/connection/grpc/stream.go
@@ -31,7 +31,7 @@ import (
 
 const GRPC = "gRPC"
 
-func Handle(watcher *event.InstanceEventListWatcher, stream 
proto.ServiceInstanceCtrl_WatchServer) (err error) {
+func Handle(watcher *event.InstanceSubscriber, stream 
proto.ServiceInstanceCtrl_WatchServer) (err error) {
        timer := time.NewTimer(connection.HeartbeatInterval)
        defer timer.Stop()
        for {
@@ -70,7 +70,7 @@ func Handle(watcher *event.InstanceEventListWatcher, stream 
proto.ServiceInstanc
 func ListAndWatch(ctx context.Context, serviceID string, f func() 
([]*pb.WatchInstanceResponse, int64), stream 
proto.ServiceInstanceCtrl_WatchServer) (err error) {
        domainProject := util.ParseDomainProject(ctx)
        domain := util.ParseDomain(ctx)
-       watcher := event.NewInstanceEventListWatcher(serviceID, domainProject, 
f)
+       watcher := event.NewInstanceSubscriber(serviceID, domainProject, f)
        err = event.Center().AddSubscriber(watcher)
        if err != nil {
                return
diff --git a/server/connection/grpc/stream_test.go 
b/server/connection/grpc/stream_test.go
index b20fa5d..6ee8eb3 100644
--- a/server/connection/grpc/stream_test.go
+++ b/server/connection/grpc/stream_test.go
@@ -41,7 +41,7 @@ func (x *grpcWatchServer) Context() context.Context {
 }
 
 func TestHandleWatchJob(t *testing.T) {
-       w := event.NewInstanceEventListWatcher("g", "s", nil)
+       w := event.NewInstanceSubscriber("g", "s", nil)
        w.Job <- nil
        err := stream.Handle(w, &grpcWatchServer{})
        if err == nil {
diff --git a/server/connection/ws/common.go b/server/connection/ws/common.go
new file mode 100644
index 0000000..15a060d
--- /dev/null
+++ b/server/connection/ws/common.go
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ws
+
+import (
+       "context"
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/registry"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/apache/servicecomb-service-center/server/connection"
+       "github.com/apache/servicecomb-service-center/server/event"
+       "github.com/gorilla/websocket"
+)
+
+func ListAndWatch(ctx context.Context, serviceID string, f func() 
([]*registry.WatchInstanceResponse, int64), conn *websocket.Conn) {
+       domainProject := util.ParseDomainProject(ctx)
+       domain := util.ParseDomain(ctx)
+       socket := New(ctx, conn, event.NewInstanceSubscriber(serviceID, 
domainProject, f))
+
+       connection.ReportSubscriber(domain, Websocket, 1)
+       process(socket)
+       connection.ReportSubscriber(domain, Websocket, -1)
+}
+
+func process(socket *WebSocket) {
+       if err := socket.Init(); err != nil {
+               return
+       }
+
+       socket.HandleControlMessage()
+
+       socket.Stop()
+}
+
+func SendEstablishError(conn *websocket.Conn, err error) {
+       remoteAddr := conn.RemoteAddr().String()
+       log.Errorf(err, "establish[%s] websocket watch failed.", remoteAddr)
+       if err := conn.WriteMessage(websocket.TextMessage, 
util.StringToBytesWithNoCopy(err.Error())); err != nil {
+               log.Errorf(err, "establish[%s] websocket watch failed: write 
message failed.", remoteAddr)
+       }
+}
diff --git a/server/connection/ws/publisher.go 
b/server/connection/ws/keepalive.go
similarity index 80%
rename from server/connection/ws/publisher.go
rename to server/connection/ws/keepalive.go
index a8ee692..3bd7e8b 100644
--- a/server/connection/ws/publisher.go
+++ b/server/connection/ws/keepalive.go
@@ -24,34 +24,34 @@ import (
        "time"
 )
 
-var publisher *Publisher
+var runner *KeepaliveRunner
 
 func init() {
-       publisher = NewPublisher()
-       publisher.Run()
+       runner = NewRunner()
+       runner.Run()
 }
 
-type Publisher struct {
+type KeepaliveRunner struct {
        wss       []*WebSocket
        lock      sync.Mutex
        goroutine *gopool.Pool
 }
 
-func (wh *Publisher) Run() {
-       gopool.Go(publisher.loop)
+func (wh *KeepaliveRunner) Run() {
+       gopool.Go(runner.loop)
 }
 
-func (wh *Publisher) Stop() {
+func (wh *KeepaliveRunner) Stop() {
        wh.goroutine.Close(true)
 }
 
-func (wh *Publisher) dispatch(ws *WebSocket, payload interface{}) {
+func (wh *KeepaliveRunner) dispatch(ws *WebSocket, payload interface{}) {
        wh.goroutine.Do(func(ctx context.Context) {
                ws.HandleEvent(payload)
        })
 }
 
-func (wh *Publisher) loop(ctx context.Context) {
+func (wh *KeepaliveRunner) loop(ctx context.Context) {
        defer wh.Stop()
        ticker := time.NewTicker(500 * time.Millisecond)
        for {
@@ -91,18 +91,18 @@ func (wh *Publisher) loop(ctx context.Context) {
        }
 }
 
-func (wh *Publisher) Accept(ws *WebSocket) {
+func (wh *KeepaliveRunner) Accept(ws *WebSocket) {
        wh.lock.Lock()
        wh.wss = append(wh.wss, ws)
        wh.lock.Unlock()
 }
 
-func NewPublisher() *Publisher {
-       return &Publisher{
+func NewRunner() *KeepaliveRunner {
+       return &KeepaliveRunner{
                goroutine: gopool.New(context.Background()),
        }
 }
 
-func Instance() *Publisher {
-       return publisher
+func Runner() *KeepaliveRunner {
+       return runner
 }
diff --git a/server/connection/ws/options.go b/server/connection/ws/options.go
new file mode 100644
index 0000000..5817ef0
--- /dev/null
+++ b/server/connection/ws/options.go
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ws
+
+import (
+       "time"
+
+       "github.com/apache/servicecomb-service-center/server/connection"
+)
+
+type Options struct {
+       ReadTimeout time.Duration
+       SendTimeout time.Duration
+}
+
+func ToOptions() Options {
+       return Options{
+               ReadTimeout: connection.ReadTimeout,
+               SendTimeout: connection.SendTimeout,
+       }
+}
diff --git a/server/connection/ws/websocket.go 
b/server/connection/ws/websocket.go
index d1d01a9..fc6db9c 100644
--- a/server/connection/ws/websocket.go
+++ b/server/connection/ws/websocket.go
@@ -34,11 +34,13 @@ import (
 const Websocket = "Websocket"
 
 type WebSocket struct {
+       Options
+
        ctx    context.Context
        ticker *time.Ticker
        conn   *websocket.Conn
        // watcher subscribe the notification service event
-       watcher         *event.InstanceEventListWatcher
+       watcher         *event.InstanceSubscriber
        needPingWatcher bool
        free            chan struct{}
        closed          chan struct{}
@@ -67,43 +69,20 @@ func (wh *WebSocket) Init() error {
                return err
        }
 
-       // put in publisher queue
-       Instance().Accept(wh)
+       // put in runner queue
+       Runner().Accept(wh)
 
        log.Debugf("start watching instance status, watcher[%s], subject: %s, 
group: %s",
                remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
        return nil
 }
 
-func (wh *WebSocket) ReadTimeout() time.Duration {
-       return connection.ReadTimeout
-}
-
-func (wh *WebSocket) SendTimeout() time.Duration {
-       return connection.SendTimeout
-}
-
-func (wh *WebSocket) Heartbeat(messageType int) error {
-       err := wh.conn.WriteControl(messageType, []byte{}, 
time.Now().Add(wh.SendTimeout()))
-       if err != nil {
-               messageTypeName := "Ping"
-               if messageType == websocket.PongMessage {
-                       messageTypeName = "Pong"
-               }
-               log.Errorf(err, "fail to send '%s' to watcher[%s], subject: %s, 
group: %s",
-                       messageTypeName, wh.conn.RemoteAddr(), 
wh.watcher.Subject(), wh.watcher.Group())
-               //wh.watcher.SetError(err)
-               return err
-       }
-       return nil
-}
-
 func (wh *WebSocket) HandleControlMessage() {
        remoteAddr := wh.conn.RemoteAddr().String()
        // PING
        wh.conn.SetPingHandler(func(message string) error {
                defer func() {
-                       err := 
wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
+                       err := 
wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
                        if err != nil {
                                log.Error("", err)
                        }
@@ -113,12 +92,12 @@ func (wh *WebSocket) HandleControlMessage() {
                                message, remoteAddr, wh.watcher.Subject(), 
wh.watcher.Group())
                }
                wh.needPingWatcher = false
-               return wh.Heartbeat(websocket.PongMessage)
+               return wh.WritePingPong(websocket.PongMessage)
        })
        // PONG
        wh.conn.SetPongHandler(func(message string) error {
                defer func() {
-                       err := 
wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
+                       err := 
wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
                        if err != nil {
                                log.Error("", err)
                        }
@@ -135,7 +114,7 @@ func (wh *WebSocket) HandleControlMessage() {
        })
 
        wh.conn.SetReadLimit(connection.ReadMaxBody)
-       err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
+       err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
        if err != nil {
                log.Error("", err)
        }
@@ -155,7 +134,7 @@ func (wh *WebSocket) sendClose(code int, text string) error 
{
        if code != websocket.CloseNoStatusReceived {
                message = websocket.FormatCloseMessage(code, text)
        }
-       err := wh.conn.WriteControl(websocket.CloseMessage, message, 
time.Now().Add(wh.SendTimeout()))
+       err := wh.conn.WriteControl(websocket.CloseMessage, message, 
time.Now().Add(wh.SendTimeout))
        if err != nil {
                log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: 
%s",
                        remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
@@ -164,7 +143,7 @@ func (wh *WebSocket) sendClose(code int, text string) error 
{
        return nil
 }
 
-// Pick will be called by publisher
+// Pick will be called by runner
 func (wh *WebSocket) Pick() interface{} {
        select {
        case <-wh.Ready():
@@ -194,84 +173,98 @@ func (wh *WebSocket) Pick() interface{} {
 func (wh *WebSocket) HandleEvent(o interface{}) {
        defer wh.SetReady()
 
-       var (
-               message    []byte
-               remoteAddr = wh.conn.RemoteAddr().String()
-       )
-
+       var remoteAddr = wh.conn.RemoteAddr().String()
        switch o := o.(type) {
        case error:
                log.Errorf(o, "watcher[%s] catch an err, subject: %s, group: 
%s",
                        remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-
-               message = util.StringToBytesWithNoCopy(fmt.Sprintf("watcher 
catch an err: %s", o.Error()))
+               _ = wh.write(util.StringToBytesWithNoCopy(fmt.Sprintf("watcher 
catch an err: %s", o.Error())))
        case time.Time:
-               domainProject := util.ParseDomainProject(wh.ctx)
-               if !serviceUtil.ServiceExist(wh.ctx, domainProject, 
wh.watcher.Group()) {
-                       message = util.StringToBytesWithNoCopy("Service does 
not exit.")
-                       break
-               }
+               wh.Keepalive()
+       case *event.InstanceEvent:
+               wh.WriteInstanceEvent(o)
+       default:
+               log.Errorf(nil, "watcher[%s] unknown input %v, subject: %s, 
group: %s",
+                       remoteAddr, o, wh.watcher.Subject(), wh.watcher.Group())
+       }
+}
 
-               if !wh.needPingWatcher {
-                       return
-               }
+func (wh *WebSocket) Keepalive() {
+       domainProject := util.ParseDomainProject(wh.ctx)
+       if !serviceUtil.ServiceExist(wh.ctx, domainProject, wh.watcher.Group()) 
{
+               _ = wh.write(util.StringToBytesWithNoCopy("Service does not 
exit."))
+               return
+       }
 
-               if err := wh.Heartbeat(websocket.PingMessage); err != nil {
-                       log.Errorf(err, "send 'Ping' message to watcher[%s] 
failed, subject: %s, group: %s",
-                               remoteAddr, wh.watcher.Subject(), 
wh.watcher.Group())
-                       return
-               }
+       if !wh.needPingWatcher {
+               return
+       }
 
-               log.Debugf("send 'Ping' message to watcher[%s], subject: %s, 
group: %s",
+       remoteAddr := wh.conn.RemoteAddr().String()
+       if err := wh.WritePingPong(websocket.PingMessage); err != nil {
+               log.Errorf(err, "send 'Ping' message to watcher[%s] failed, 
subject: %s, group: %s",
                        remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
                return
-       case *event.InstanceEvent:
-               resp := o.Response
+       }
 
-               providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, 
resp.Key.ServiceName, resp.Key.Version)
-               if resp.Action != string(pb.EVT_EXPIRE) {
-                       providerFlag = fmt.Sprintf("%s/%s(%s)", 
resp.Instance.ServiceId, resp.Instance.InstanceId, providerFlag)
-               }
-               log.Infof("event[%s] is coming in, watcher[%s] watch %s, 
subject: %s, group: %s",
-                       resp.Action, remoteAddr, providerFlag, 
wh.watcher.Subject(), wh.watcher.Group())
+       log.Debugf("send 'Ping' message to watcher[%s], subject: %s, group: %s",
+               remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
+       return
+}
 
-               resp.Response = nil
-               data, err := json.Marshal(resp)
-               if err != nil {
-                       log.Errorf(err, "watcher[%s] watch %s, subject: %s, 
group: %s",
-                               remoteAddr, providerFlag, o, 
wh.watcher.Subject(), wh.watcher.Group())
-                       message = 
util.StringToBytesWithNoCopy(fmt.Sprintf("marshal output file error, %s", 
err.Error()))
-                       break
+func (wh *WebSocket) WritePingPong(messageType int) error {
+       err := wh.conn.WriteControl(messageType, []byte{}, 
time.Now().Add(wh.SendTimeout))
+       if err != nil {
+               messageTypeName := "Ping"
+               if messageType == websocket.PongMessage {
+                       messageTypeName = "Pong"
                }
-               message = data
-       default:
-               log.Errorf(nil, "watcher[%s] unknown input %v, subject: %s, 
group: %s",
-                       remoteAddr, o, wh.watcher.Subject(), wh.watcher.Group())
-               return
+               log.Errorf(err, "fail to send '%s' to watcher[%s], subject: %s, 
group: %s",
+                       messageTypeName, wh.conn.RemoteAddr(), 
wh.watcher.Subject(), wh.watcher.Group())
+               //wh.watcher.SetError(err)
+               return err
        }
+       return nil
+}
 
-       select {
-       case <-wh.closed:
-               return
-       default:
+func (wh *WebSocket) WriteInstanceEvent(evt *event.InstanceEvent) {
+       resp := evt.Response
+       providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, 
resp.Key.ServiceName, resp.Key.Version)
+       if resp.Action != string(pb.EVT_EXPIRE) {
+               providerFlag = fmt.Sprintf("%s/%s(%s)", 
resp.Instance.ServiceId, resp.Instance.InstanceId, providerFlag)
        }
+       remoteAddr := wh.conn.RemoteAddr().String()
+       log.Infof("event[%s] is coming in, watcher[%s] watch %s, subject: %s, 
group: %s",
+               resp.Action, remoteAddr, providerFlag, wh.watcher.Subject(), 
wh.watcher.Group())
 
-       err := wh.WriteMessage(message)
-       if evt, ok := o.(*event.InstanceEvent); ok {
-               connection.ReportPublishCompleted(evt, err)
-       }
+       resp.Response = nil
+       data, err := json.Marshal(resp)
        if err != nil {
-               log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: 
%s",
-                       remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
+               log.Errorf(err, "watcher[%s] watch %s, subject: %s, group: %s",
+                       remoteAddr, providerFlag, evt, wh.watcher.Subject(), 
wh.watcher.Group())
+               data = util.StringToBytesWithNoCopy(fmt.Sprintf("marshal output 
file error, %s", err.Error()))
        }
+       err = wh.write(data)
+       connection.ReportPublishCompleted(evt, err)
 }
 
-func (wh *WebSocket) WriteMessage(message []byte) error {
-       err := wh.conn.SetWriteDeadline(time.Now().Add(wh.SendTimeout()))
+func (wh *WebSocket) write(message []byte) error {
+       select {
+       case <-wh.closed:
+               return nil
+       default:
+       }
+
+       err := wh.conn.SetWriteDeadline(time.Now().Add(wh.SendTimeout))
        if err != nil {
                return err
        }
-       return wh.conn.WriteMessage(websocket.TextMessage, message)
+       err = wh.conn.WriteMessage(websocket.TextMessage, message)
+       if err != nil {
+               log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: 
%s",
+                       wh.conn.RemoteAddr().String(), wh.watcher.Subject(), 
wh.watcher.Group())
+       }
+       return err
 }
 
 func (wh *WebSocket) Ready() <-chan struct{} {
@@ -289,36 +282,9 @@ func (wh *WebSocket) Stop() {
        close(wh.closed)
 }
 
-func ListAndWatch(ctx context.Context, serviceID string, f func() 
([]*pb.WatchInstanceResponse, int64), conn *websocket.Conn) {
-       domainProject := util.ParseDomainProject(ctx)
-       domain := util.ParseDomain(ctx)
-       socket := New(ctx, conn, event.NewInstanceEventListWatcher(serviceID, 
domainProject, f))
-
-       connection.ReportSubscriber(domain, Websocket, 1)
-       process(socket)
-       connection.ReportSubscriber(domain, Websocket, -1)
-}
-
-func process(socket *WebSocket) {
-       if err := socket.Init(); err != nil {
-               return
-       }
-
-       socket.HandleControlMessage()
-
-       socket.Stop()
-}
-
-func SendEstablishError(conn *websocket.Conn, err error) {
-       remoteAddr := conn.RemoteAddr().String()
-       log.Errorf(err, "establish[%s] websocket watch failed.", remoteAddr)
-       if err := conn.WriteMessage(websocket.TextMessage, 
util.StringToBytesWithNoCopy(err.Error())); err != nil {
-               log.Errorf(err, "establish[%s] websocket watch failed: write 
message failed.", remoteAddr)
-       }
-}
-
-func New(ctx context.Context, conn *websocket.Conn, watcher 
*event.InstanceEventListWatcher) *WebSocket {
+func New(ctx context.Context, conn *websocket.Conn, watcher 
*event.InstanceSubscriber) *WebSocket {
        return &WebSocket{
+               Options: ToOptions(),
                ctx:     ctx,
                conn:    conn,
                watcher: watcher,
diff --git a/server/connection/ws/websocket_test.go 
b/server/connection/ws/websocket_test.go
index b7895eb..8338b8f 100644
--- a/server/connection/ws/websocket_test.go
+++ b/server/connection/ws/websocket_test.go
@@ -68,7 +68,7 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
 
        wss.SendEstablishError(conn, errors.New("error"))
 
-       w := event.NewInstanceEventListWatcher("g", "s", func() (results 
[]*registry.WatchInstanceResponse, rev int64) {
+       w := event.NewInstanceSubscriber("g", "s", func() (results 
[]*registry.WatchInstanceResponse, rev int64) {
                results = append(results, &registry.WatchInstanceResponse{
                        Response: proto.CreateResponse(proto.Response_SUCCESS, 
"ok"),
                        Action:   string(registry.EVT_CREATE),
@@ -89,7 +89,7 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
        go func() {
                wss.ListAndWatch(context.Background(), "", nil, conn)
 
-               w2 := event.NewInstanceEventListWatcher("g", "s", func() 
(results []*registry.WatchInstanceResponse, rev int64) {
+               w2 := event.NewInstanceSubscriber("g", "s", func() (results 
[]*registry.WatchInstanceResponse, rev int64) {
                        return
                })
                ws2 := wss.New(context.Background(), conn, w2)
@@ -115,8 +115,8 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
 
        ws.HandleEvent(nil)
 
-       ws.Heartbeat(websocket.PingMessage)
-       ws.Heartbeat(websocket.PongMessage)
+       ws.WritePingPong(websocket.PingMessage)
+       ws.WritePingPong(websocket.PongMessage)
 
        ws.HandleEvent(time.Now())
 
@@ -124,10 +124,10 @@ func TestDoWebSocketListAndWatch(t *testing.T) {
 
        <-time.After(time.Second)
 
-       ws.Heartbeat(websocket.PingMessage)
-       ws.Heartbeat(websocket.PongMessage)
+       ws.WritePingPong(websocket.PingMessage)
+       ws.WritePingPong(websocket.PongMessage)
 
        w.OnMessage(nil)
 
-       wss.Instance().Stop()
+       wss.Runner().Stop()
 }
diff --git a/server/event/instance_event.go b/server/event/instance_event.go
new file mode 100644
index 0000000..0098187
--- /dev/null
+++ b/server/event/instance_event.go
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+       "github.com/apache/servicecomb-service-center/pkg/event"
+       pb "github.com/apache/servicecomb-service-center/pkg/registry"
+       simple "github.com/apache/servicecomb-service-center/pkg/time"
+)
+
+const QueueSize = 5000
+
+var INSTANCE = event.RegisterType("INSTANCE", QueueSize)
+
+// 状态变化推送
+type InstanceEvent struct {
+       event.Event
+       Revision int64
+       Response *pb.WatchInstanceResponse
+}
+
+func NewInstanceEvent(serviceID, domainProject string, rev int64, response 
*pb.WatchInstanceResponse) *InstanceEvent {
+       return &InstanceEvent{
+               Event:    event.NewEvent(INSTANCE, domainProject, serviceID),
+               Revision: rev,
+               Response: response,
+       }
+}
+
+func NewInstanceEventWithTime(serviceID, domainProject string, rev int64, 
createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
+       return &InstanceEvent{
+               Event:    event.NewEventWithTime(INSTANCE, domainProject, 
serviceID, createAt),
+               Revision: rev,
+               Response: response,
+       }
+}
diff --git a/server/event/instance_subscriber.go 
b/server/event/instance_subscriber.go
index e13d0e9..c5fad5a 100644
--- a/server/event/instance_subscriber.go
+++ b/server/event/instance_subscriber.go
@@ -23,25 +23,12 @@ import (
        "github.com/apache/servicecomb-service-center/pkg/gopool"
        "github.com/apache/servicecomb-service-center/pkg/log"
        pb "github.com/apache/servicecomb-service-center/pkg/registry"
-       simple "github.com/apache/servicecomb-service-center/pkg/time"
        "time"
 )
 
-const (
-       AddJobTimeout  = 1 * time.Second
-       EventQueueSize = 5000
-)
-
-var INSTANCE = event.RegisterType("INSTANCE", EventQueueSize)
+const AddJobTimeout = 1 * time.Second
 
-// 状态变化推送
-type InstanceEvent struct {
-       event.Event
-       Revision int64
-       Response *pb.WatchInstanceResponse
-}
-
-type InstanceEventListWatcher struct {
+type InstanceSubscriber struct {
        event.Subscriber
        Job          chan *InstanceEvent
        ListRevision int64
@@ -49,7 +36,7 @@ type InstanceEventListWatcher struct {
        listCh       chan struct{}
 }
 
-func (w *InstanceEventListWatcher) SetError(err error) {
+func (w *InstanceSubscriber) SetError(err error) {
        w.Subscriber.SetError(err)
        // 触发清理job
        e := w.Bus().Fire(event.NewUnhealthyEvent(w))
@@ -58,7 +45,7 @@ func (w *InstanceEventListWatcher) SetError(err error) {
        }
 }
 
-func (w *InstanceEventListWatcher) OnAccept() {
+func (w *InstanceSubscriber) OnAccept() {
        if w.Err() != nil {
                return
        }
@@ -66,7 +53,7 @@ func (w *InstanceEventListWatcher) OnAccept() {
        gopool.Go(w.listAndPublishJobs)
 }
 
-func (w *InstanceEventListWatcher) listAndPublishJobs(_ context.Context) {
+func (w *InstanceSubscriber) listAndPublishJobs(_ context.Context) {
        defer close(w.listCh)
        if w.ListFunc == nil {
                return
@@ -79,7 +66,7 @@ func (w *InstanceEventListWatcher) listAndPublishJobs(_ 
context.Context) {
 }
 
 //被通知
-func (w *InstanceEventListWatcher) OnMessage(job event.Event) {
+func (w *InstanceSubscriber) OnMessage(job event.Event) {
        if w.Err() != nil {
                return
        }
@@ -111,7 +98,7 @@ func (w *InstanceEventListWatcher) OnMessage(job 
event.Event) {
        w.sendMessage(wJob)
 }
 
-func (w *InstanceEventListWatcher) sendMessage(job *InstanceEvent) {
+func (w *InstanceSubscriber) sendMessage(job *InstanceEvent) {
        defer log.Recover()
        select {
        case w.Job <- job:
@@ -128,33 +115,17 @@ func (w *InstanceEventListWatcher) sendMessage(job 
*InstanceEvent) {
        }
 }
 
-func (w *InstanceEventListWatcher) Timeout() time.Duration {
+func (w *InstanceSubscriber) Timeout() time.Duration {
        return AddJobTimeout
 }
 
-func (w *InstanceEventListWatcher) Close() {
+func (w *InstanceSubscriber) Close() {
        close(w.Job)
 }
 
-func NewInstanceEvent(serviceID, domainProject string, rev int64, response 
*pb.WatchInstanceResponse) *InstanceEvent {
-       return &InstanceEvent{
-               Event:    event.NewEvent(INSTANCE, domainProject, serviceID),
-               Revision: rev,
-               Response: response,
-       }
-}
-
-func NewInstanceEventWithTime(serviceID, domainProject string, rev int64, 
createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
-       return &InstanceEvent{
-               Event:    event.NewEventWithTime(INSTANCE, domainProject, 
serviceID, createAt),
-               Revision: rev,
-               Response: response,
-       }
-}
-
-func NewInstanceEventListWatcher(serviceID, domainProject string,
-       listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) 
*InstanceEventListWatcher {
-       watcher := &InstanceEventListWatcher{
+func NewInstanceSubscriber(serviceID, domainProject string,
+       listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) 
*InstanceSubscriber {
+       watcher := &InstanceSubscriber{
                Subscriber: event.NewSubscriber(INSTANCE, domainProject, 
serviceID),
                Job:        make(chan *InstanceEvent, INSTANCE.QueueSize()),
                ListFunc:   listFunc,

Reply via email to