This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch v1.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/v1.x by this push:
     new db013af  SCB-2176 Refactor websocket (#981)
db013af is described below

commit db013afc9c9d1673999190b9ceb967a08e673813
Author: little-cui <[email protected]>
AuthorDate: Mon May 17 20:00:11 2021 +0800

    SCB-2176 Refactor websocket (#981)
    
    * SCB-2176 Refactor websocket
    
    * SCB-2176 Add UTs
---
 integration/apis.go                                |   1 -
 integration/instances_test.go                      |  12 -
 server/connection/grpc/stream.go                   |   5 +-
 server/connection/grpc/stream_test.go              |   4 +-
 server/connection/ws/broker.go                     |  81 +++++++
 .../connection/ws/{options.go => broker_test.go}   |  30 ++-
 server/connection/ws/common.go                     |  33 ++-
 .../services.go => connection/ws/common_test.go}   |  35 ++-
 .../ws/{keepalive.go => health_check.go}           |  90 ++++----
 .../ws/{options.go => health_check_test.go}        |  29 ++-
 server/connection/ws/options.go                    |  10 +-
 server/connection/ws/websocket.go                  | 244 +++++++--------------
 server/connection/ws/websocket_test.go             | 187 +++++++++-------
 server/core/proto/services.go                      |   1 -
 server/event/instance_subscriber.go                |  46 +---
 server/rest/controller/v3/instance_watcher.go      |   1 -
 server/rest/controller/v4/instance_watcher.go      |  14 --
 server/service/watch.go                            |  15 +-
 server/service/watch_test.go                       |   7 -
 19 files changed, 412 insertions(+), 433 deletions(-)

diff --git a/integration/apis.go b/integration/apis.go
index d263a6b..f8aa164 100644
--- a/integration/apis.go
+++ b/integration/apis.go
@@ -47,7 +47,6 @@ var UPDATEINSTANCEMETADATA = 
"/v4/default/registry/microservices/:serviceId/inst
 var UPDATEINSTANCESTATUS = 
"/v4/default/registry/microservices/:serviceId/instances/:instanceId/status"
 var INSTANCEHEARTBEAT = 
"/v4/default/registry/microservices/:serviceId/instances/:instanceId/heartbeat"
 var INSTANCEWATCHER = "/v4/default/registry/microservices/:serviceId/watcher"
-var INSTANCELISTWATCHER = 
"/v4/default/registry/microservices/:serviceId/listwatcher"
 
 //Governance API's
 var GETGOVERNANCESERVICEDETAILS = "/v4/default/govern/microservices/:serviceId"
diff --git a/integration/instances_test.go b/integration/instances_test.go
index 0f90d31..b84fbc4 100644
--- a/integration/instances_test.go
+++ b/integration/instances_test.go
@@ -599,18 +599,6 @@ var _ = Describe("MicroService Api Test", func() {
 
                                
Expect(resp.StatusCode).To(Equal(http.StatusBadRequest))
                        })
-                       It("Call the listwatcher API ", func() {
-                               //This api gives 400 bad request for the 
integration test
-                               // as integration test is not able to make ws 
connection
-                               url := strings.Replace(INSTANCELISTWATCHER, 
":serviceId", serviceId, 1)
-                               req, _ := http.NewRequest(GET, SCURL+url, nil)
-                               req.Header.Set("X-Domain-Name", "default")
-                               resp, err := scclient.Do(req)
-                               Expect(err).To(BeNil())
-                               defer resp.Body.Close()
-
-                               
Expect(resp.StatusCode).To(Equal(http.StatusBadRequest))
-                       })
                })
        })
 
diff --git a/server/connection/grpc/stream.go b/server/connection/grpc/stream.go
index 0609bdb..0e5415e 100644
--- a/server/connection/grpc/stream.go
+++ b/server/connection/grpc/stream.go
@@ -21,7 +21,6 @@ import (
        "context"
        "errors"
        "github.com/apache/servicecomb-service-center/pkg/log"
-       pb "github.com/apache/servicecomb-service-center/pkg/registry"
        "github.com/apache/servicecomb-service-center/pkg/util"
        "github.com/apache/servicecomb-service-center/server/connection"
        "github.com/apache/servicecomb-service-center/server/core/proto"
@@ -67,10 +66,10 @@ func Handle(watcher *event.InstanceSubscriber, stream 
proto.ServiceInstanceCtrl_
        }
 }
 
-func ListAndWatch(ctx context.Context, serviceID string, f func() 
([]*pb.WatchInstanceResponse, int64), stream 
proto.ServiceInstanceCtrl_WatchServer) (err error) {
+func Watch(ctx context.Context, serviceID string, stream 
proto.ServiceInstanceCtrl_WatchServer) (err error) {
        domainProject := util.ParseDomainProject(ctx)
        domain := util.ParseDomain(ctx)
-       watcher := event.NewInstanceSubscriber(serviceID, domainProject, f)
+       watcher := event.NewInstanceSubscriber(serviceID, domainProject)
        err = event.Center().AddSubscriber(watcher)
        if err != nil {
                return
diff --git a/server/connection/grpc/stream_test.go 
b/server/connection/grpc/stream_test.go
index 6ee8eb3..c549b65 100644
--- a/server/connection/grpc/stream_test.go
+++ b/server/connection/grpc/stream_test.go
@@ -41,7 +41,7 @@ func (x *grpcWatchServer) Context() context.Context {
 }
 
 func TestHandleWatchJob(t *testing.T) {
-       w := event.NewInstanceSubscriber("g", "s", nil)
+       w := event.NewInstanceSubscriber("g", "s")
        w.Job <- nil
        err := stream.Handle(w, &grpcWatchServer{})
        if err == nil {
@@ -54,6 +54,6 @@ func TestHandleWatchJob(t *testing.T) {
 
 func TestDoStreamListAndWatch(t *testing.T) {
        defer log.Recover()
-       err := stream.ListAndWatch(context.Background(), "s", nil, nil)
+       err := stream.Watch(context.Background(), "s", nil)
        t.Fatal("TestDoStreamListAndWatch failed", err)
 }
diff --git a/server/connection/ws/broker.go b/server/connection/ws/broker.go
new file mode 100644
index 0000000..e4901ff
--- /dev/null
+++ b/server/connection/ws/broker.go
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ws
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       pb "github.com/apache/servicecomb-service-center/pkg/registry"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/apache/servicecomb-service-center/server/connection"
+       "github.com/apache/servicecomb-service-center/server/event"
+)
+
+type Broker struct {
+       consumer *WebSocket
+       producer *event.InstanceSubscriber
+}
+
+func (b *Broker) Listen(ctx context.Context) error {
+       for {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case instanceEvent, ok := <-b.producer.Job:
+                       if !ok {
+                               return fmt.Errorf("read producer[%v] event 
failed", b.producer.Group())
+                       }
+                       err := b.write(instanceEvent)
+                       if err != nil {
+                               log.Errorf(err, "write instance event to 
subscriber[%s] failed, group: %s",
+                                       b.consumer.RemoteAddr, 
b.producer.Group())
+                               return err
+                       }
+               }
+       }
+}
+func (b *Broker) write(evt *event.InstanceEvent) error {
+       resp := evt.Response
+       providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, 
resp.Key.ServiceName, resp.Key.Version)
+       if resp.Action != string(pb.EVT_EXPIRE) {
+               providerFlag = fmt.Sprintf("%s/%s(%s)", 
resp.Instance.ServiceId, resp.Instance.InstanceId, providerFlag)
+       }
+       remoteAddr := b.consumer.Conn.RemoteAddr().String()
+       log.Infof("event[%s] is coming in, subscriber[%s] watch %s, group: %s",
+               resp.Action, remoteAddr, providerFlag, b.producer.Group())
+
+       resp.Response = nil
+       data, err := json.Marshal(resp)
+       if err != nil {
+               log.Errorf(err, "subscriber[%s] watch %s, group: %s",
+                       remoteAddr, providerFlag, b.producer.Group())
+               data = util.StringToBytesWithNoCopy(fmt.Sprintf("marshal output 
file error, %s", err.Error()))
+       }
+       err = b.consumer.WriteTextMessage(data)
+       connection.ReportPublishCompleted(evt, err)
+       return err
+}
+
+func NewBroker(ws *WebSocket, is *event.InstanceSubscriber) *Broker {
+       return &Broker{
+               consumer: ws,
+               producer: is,
+       }
+}
diff --git a/server/connection/ws/options.go 
b/server/connection/ws/broker_test.go
similarity index 59%
copy from server/connection/ws/options.go
copy to server/connection/ws/broker_test.go
index 5817ef0..a3597c3 100644
--- a/server/connection/ws/options.go
+++ b/server/connection/ws/broker_test.go
@@ -15,22 +15,28 @@
  * limitations under the License.
  */
 
-package ws
+package ws_test
 
 import (
-       "time"
-
-       "github.com/apache/servicecomb-service-center/server/connection"
+       "context"
+       "github.com/apache/servicecomb-service-center/server/connection/ws"
+       "github.com/apache/servicecomb-service-center/server/event"
+       "github.com/stretchr/testify/assert"
+       "testing"
 )
 
-type Options struct {
-       ReadTimeout time.Duration
-       SendTimeout time.Duration
+func TestNewBroker(t *testing.T) {
+       t.Run("should not return nil when new broker", func(t *testing.T) {
+               assert.NotNil(t, ws.NewBroker(nil, nil))
+
+       })
 }
 
-func ToOptions() Options {
-       return Options{
-               ReadTimeout: connection.ReadTimeout,
-               SendTimeout: connection.SendTimeout,
-       }
+func TestBroker_Listen(t *testing.T) {
+       t.Run("should return err when listen context cancelled", func(t 
*testing.T) {
+               broker := ws.NewBroker(nil, event.NewInstanceSubscriber("", ""))
+               ctx, cancel := context.WithCancel(context.Background())
+               cancel()
+               assert.Equal(t, context.Canceled, broker.Listen(ctx))
+       })
 }
diff --git a/server/connection/ws/common.go b/server/connection/ws/common.go
index 15a060d..0132202 100644
--- a/server/connection/ws/common.go
+++ b/server/connection/ws/common.go
@@ -19,32 +19,43 @@ package ws
 
 import (
        "context"
+       "fmt"
+       "github.com/apache/servicecomb-service-center/pkg/gopool"
        "github.com/apache/servicecomb-service-center/pkg/log"
-       "github.com/apache/servicecomb-service-center/pkg/registry"
        "github.com/apache/servicecomb-service-center/pkg/util"
        "github.com/apache/servicecomb-service-center/server/connection"
        "github.com/apache/servicecomb-service-center/server/event"
        "github.com/gorilla/websocket"
 )
 
-func ListAndWatch(ctx context.Context, serviceID string, f func() 
([]*registry.WatchInstanceResponse, int64), conn *websocket.Conn) {
+func Watch(ctx context.Context, serviceID string, conn *websocket.Conn) {
        domainProject := util.ParseDomainProject(ctx)
        domain := util.ParseDomain(ctx)
-       socket := New(ctx, conn, event.NewInstanceSubscriber(serviceID, 
domainProject, f))
 
-       connection.ReportSubscriber(domain, Websocket, 1)
-       process(socket)
-       connection.ReportSubscriber(domain, Websocket, -1)
-}
+       ws := NewWebSocket(domainProject, serviceID, conn)
+       HealthChecker().Accept(ws)
 
-func process(socket *WebSocket) {
-       if err := socket.Init(); err != nil {
+       subscriber := event.NewInstanceSubscriber(serviceID, domainProject)
+       err := event.Center().AddSubscriber(subscriber)
+       if err != nil {
+               SendEstablishError(conn, err)
                return
        }
 
-       socket.HandleControlMessage()
+       connection.ReportSubscriber(domain, Websocket, 1)
+       defer connection.ReportSubscriber(domain, Websocket, -1)
 
-       socket.Stop()
+       pool := gopool.New(ctx).Do(func(ctx context.Context) {
+               if err := NewBroker(ws, subscriber).Listen(ctx); err != nil {
+                       log.Error(fmt.Sprintf("[%s] listen service[%s] failed", 
conn.RemoteAddr(), serviceID), err)
+               }
+       })
+       defer pool.Done()
+
+       if err := ws.ReadMessage(); err != nil {
+               log.Error(fmt.Sprintf("[%s] handle service[%s] control message 
failed", conn.RemoteAddr(), serviceID), err)
+               subscriber.SetError(err)
+       }
 }
 
 func SendEstablishError(conn *websocket.Conn, err error) {
diff --git a/server/core/proto/services.go b/server/connection/ws/common_test.go
similarity index 53%
copy from server/core/proto/services.go
copy to server/connection/ws/common_test.go
index 12f6af4..df8ae55 100644
--- a/server/core/proto/services.go
+++ b/server/connection/ws/common_test.go
@@ -14,21 +14,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package proto
+
+package ws_test
 
 import (
        "context"
-       "github.com/apache/servicecomb-service-center/pkg/registry"
-       "github.com/gorilla/websocket"
-)
-
-type ServiceInstanceCtrlServerEx interface {
-       ServiceInstanceCtrlServer
+       "errors"
+       "testing"
 
-       BatchFind(ctx context.Context, in *registry.BatchFindInstancesRequest) 
(*registry.BatchFindInstancesResponse, error)
+       wss "github.com/apache/servicecomb-service-center/server/connection/ws"
+       "github.com/stretchr/testify/assert"
+)
 
-       WebSocketWatch(ctx context.Context, in *registry.WatchInstanceRequest, 
conn *websocket.Conn)
-       WebSocketListAndWatch(ctx context.Context, in 
*registry.WatchInstanceRequest, conn *websocket.Conn)
+func TestSendEstablishError(t *testing.T) {
+       mock := NewTest()
+       t.Run("should read the err when call", func(t *testing.T) {
+               wss.SendEstablishError(mock.ServerConn, errors.New("error"))
+               _, message, err := mock.ClientConn.ReadMessage()
+               assert.Nil(t, err)
+               assert.Equal(t, "error", string(message))
+       })
+}
 
-       ClusterHealth(ctx context.Context) (*registry.GetInstancesResponse, 
error)
+func TestWatch(t *testing.T) {
+       t.Run("should return when ctx cancelled", func(t *testing.T) {
+               mock := NewTest()
+               mock.ServerConn.Close()
+               ctx, cancel := context.WithCancel(context.Background())
+               cancel()
+               wss.Watch(ctx, "", mock.ServerConn)
+       })
 }
diff --git a/server/connection/ws/keepalive.go 
b/server/connection/ws/health_check.go
similarity index 57%
rename from server/connection/ws/keepalive.go
rename to server/connection/ws/health_check.go
index 3bd7e8b..aeb4f38 100644
--- a/server/connection/ws/keepalive.go
+++ b/server/connection/ws/health_check.go
@@ -19,39 +19,36 @@ package ws
 
 import (
        "context"
-       "github.com/apache/servicecomb-service-center/pkg/gopool"
+       "fmt"
        "sync"
        "time"
+
+       "github.com/apache/servicecomb-service-center/pkg/gopool"
+       "github.com/apache/servicecomb-service-center/pkg/log"
 )
 
-var runner *KeepaliveRunner
+var checker *HealthCheck
 
 func init() {
-       runner = NewRunner()
-       runner.Run()
+       checker = NewHealthCheck()
+       checker.Run()
 }
 
-type KeepaliveRunner struct {
+type HealthCheck struct {
        wss       []*WebSocket
        lock      sync.Mutex
        goroutine *gopool.Pool
 }
 
-func (wh *KeepaliveRunner) Run() {
-       gopool.Go(runner.loop)
+func (wh *HealthCheck) Run() {
+       gopool.Go(checker.loop)
 }
 
-func (wh *KeepaliveRunner) Stop() {
+func (wh *HealthCheck) Stop() {
        wh.goroutine.Close(true)
 }
 
-func (wh *KeepaliveRunner) dispatch(ws *WebSocket, payload interface{}) {
-       wh.goroutine.Do(func(ctx context.Context) {
-               ws.HandleEvent(payload)
-       })
-}
-
-func (wh *KeepaliveRunner) loop(ctx context.Context) {
+func (wh *HealthCheck) loop(ctx context.Context) {
        defer wh.Stop()
        ticker := time.NewTicker(500 * time.Millisecond)
        for {
@@ -60,49 +57,52 @@ func (wh *KeepaliveRunner) loop(ctx context.Context) {
                        // server shutdown
                        return
                case <-ticker.C:
-                       var removes []int
-                       for i, ws := range wh.wss {
-                               if payload := ws.Pick(); payload != nil {
-                                       if _, ok := payload.(error); ok {
-                                               removes = append(removes, i)
-                                       }
-                                       wh.dispatch(ws, payload)
+                       for _, ws := range wh.wss {
+                               if t := ws.NeedCheck(); t == nil {
+                                       continue
                                }
+                               wh.check(ws)
                        }
-                       if len(removes) == 0 {
-                               continue
-                       }
-
-                       wh.lock.Lock()
-                       var (
-                               news []*WebSocket
-                               s    int
-                       )
-                       for _, e := range removes {
-                               news = append(news, wh.wss[s:e]...)
-                               s = e + 1
-                       }
-                       if s < len(wh.wss) {
-                               news = append(news, wh.wss[s:]...)
-                       }
-                       wh.wss = news
-                       wh.lock.Unlock()
                }
        }
 }
 
-func (wh *KeepaliveRunner) Accept(ws *WebSocket) {
+func (wh *HealthCheck) check(ws *WebSocket) {
+       wh.goroutine.Do(func(ctx context.Context) {
+               if err := ws.CheckHealth(ctx); err != nil {
+                       wh.Remove(ws)
+                       log.Error(fmt.Sprintf("checker removed unhealth 
websocket[%s]", ws.RemoteAddr), err)
+               }
+       })
+}
+
+func (wh *HealthCheck) Accept(ws *WebSocket) int {
        wh.lock.Lock()
        wh.wss = append(wh.wss, ws)
+       n := len(wh.wss)
+       wh.lock.Unlock()
+       return n
+}
+
+func (wh *HealthCheck) Remove(ws *WebSocket) int {
+       wh.lock.Lock()
+       for i, t := range wh.wss {
+               if t == ws {
+                       wh.wss = append(wh.wss[0:i], wh.wss[i+1:]...)
+                       break
+               }
+       }
+       n := len(wh.wss)
        wh.lock.Unlock()
+       return n
 }
 
-func NewRunner() *KeepaliveRunner {
-       return &KeepaliveRunner{
+func NewHealthCheck() *HealthCheck {
+       return &HealthCheck{
                goroutine: gopool.New(context.Background()),
        }
 }
 
-func Runner() *KeepaliveRunner {
-       return runner
+func HealthChecker() *HealthCheck {
+       return checker
 }
diff --git a/server/connection/ws/options.go 
b/server/connection/ws/health_check_test.go
similarity index 62%
copy from server/connection/ws/options.go
copy to server/connection/ws/health_check_test.go
index 5817ef0..7ab0578 100644
--- a/server/connection/ws/options.go
+++ b/server/connection/ws/health_check_test.go
@@ -15,22 +15,27 @@
  * limitations under the License.
  */
 
-package ws
+package ws_test
 
 import (
-       "time"
-
-       "github.com/apache/servicecomb-service-center/server/connection"
+       "github.com/apache/servicecomb-service-center/server/connection/ws"
+       "github.com/stretchr/testify/assert"
+       "testing"
 )
 
-type Options struct {
-       ReadTimeout time.Duration
-       SendTimeout time.Duration
+func TestNewHealthCheck(t *testing.T) {
+       t.Run("should not return nil when new", func(t *testing.T) {
+               assert.NotNil(t, ws.NewHealthCheck())
+       })
 }
 
-func ToOptions() Options {
-       return Options{
-               ReadTimeout: connection.ReadTimeout,
-               SendTimeout: connection.SendTimeout,
-       }
+func TestHealthCheck_Run(t *testing.T) {
+       mock := NewTest()
+
+       t.Run("should return 1 when accept one ws", func(t *testing.T) {
+               check := ws.NewHealthCheck()
+               webSocket := ws.NewWebSocket("", "", mock.ServerConn)
+               assert.Equal(t, 1, check.Accept(webSocket))
+               assert.Equal(t, 0, check.Remove(webSocket))
+       })
 }
diff --git a/server/connection/ws/options.go b/server/connection/ws/options.go
index 5817ef0..3196623 100644
--- a/server/connection/ws/options.go
+++ b/server/connection/ws/options.go
@@ -24,13 +24,15 @@ import (
 )
 
 type Options struct {
-       ReadTimeout time.Duration
-       SendTimeout time.Duration
+       ReadTimeout    time.Duration
+       SendTimeout    time.Duration
+       HealthInterval time.Duration
 }
 
 func ToOptions() Options {
        return Options{
-               ReadTimeout: connection.ReadTimeout,
-               SendTimeout: connection.SendTimeout,
+               ReadTimeout:    connection.ReadTimeout,
+               SendTimeout:    connection.SendTimeout,
+               HealthInterval: connection.HeartbeatInterval,
        }
 }
diff --git a/server/connection/ws/websocket.go 
b/server/connection/ws/websocket.go
index fc6db9c..24a7e2b 100644
--- a/server/connection/ws/websocket.go
+++ b/server/connection/ws/websocket.go
@@ -19,13 +19,9 @@ package ws
 
 import (
        "context"
-       "encoding/json"
        "fmt"
        "github.com/apache/servicecomb-service-center/pkg/log"
-       pb "github.com/apache/servicecomb-service-center/pkg/registry"
-       "github.com/apache/servicecomb-service-center/pkg/util"
        "github.com/apache/servicecomb-service-center/server/connection"
-       "github.com/apache/servicecomb-service-center/server/event"
        serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
        "github.com/gorilla/websocket"
        "time"
@@ -35,258 +31,182 @@ const Websocket = "Websocket"
 
 type WebSocket struct {
        Options
-
-       ctx    context.Context
-       ticker *time.Ticker
-       conn   *websocket.Conn
-       // watcher subscribe the notification service event
-       watcher         *event.InstanceSubscriber
-       needPingWatcher bool
-       free            chan struct{}
-       closed          chan struct{}
+       Conn          *websocket.Conn
+       RemoteAddr    string
+       DomainProject string
+       ConsumerID    string
+
+       ticker   *time.Ticker
+       needPing bool
+       idleCh   chan struct{}
 }
 
-func (wh *WebSocket) Init() error {
-       wh.ticker = time.NewTicker(connection.HeartbeatInterval)
-       wh.needPingWatcher = true
-       wh.free = make(chan struct{}, 1)
-       wh.closed = make(chan struct{})
-
-       wh.SetReady()
+func (wh *WebSocket) Init() {
+       wh.RemoteAddr = wh.Conn.RemoteAddr().String()
+       wh.ticker = time.NewTicker(wh.HealthInterval)
+       wh.needPing = true
+       wh.idleCh = make(chan struct{}, 1)
 
-       remoteAddr := wh.conn.RemoteAddr().String()
+       wh.registerMessageHandler()
 
-       // put in notification service queue
-       if err := event.Center().AddSubscriber(wh.watcher); err != nil {
-               err = fmt.Errorf("establish[%s] websocket watch failed: notify 
service error, %s",
-                       remoteAddr, err.Error())
-               log.Errorf(nil, err.Error())
+       wh.SetIdle()
 
-               werr := wh.conn.WriteMessage(websocket.TextMessage, 
util.StringToBytesWithNoCopy(err.Error()))
-               if werr != nil {
-                       log.Errorf(werr, "establish[%s] websocket watch failed: 
write message failed.", remoteAddr)
-               }
-               return err
-       }
-
-       // put in runner queue
-       Runner().Accept(wh)
-
-       log.Debugf("start watching instance status, watcher[%s], subject: %s, 
group: %s",
-               remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-       return nil
+       log.Debugf("start watching instance status, subscriber[%s], consumer: 
%s",
+               wh.RemoteAddr, wh.ConsumerID)
 }
 
-func (wh *WebSocket) HandleControlMessage() {
-       remoteAddr := wh.conn.RemoteAddr().String()
+func (wh *WebSocket) registerMessageHandler() {
+       remoteAddr := wh.RemoteAddr
        // PING
-       wh.conn.SetPingHandler(func(message string) error {
+       wh.Conn.SetPingHandler(func(message string) error {
                defer func() {
-                       err := 
wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
+                       err := 
wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
                        if err != nil {
                                log.Error("", err)
                        }
                }()
-               if wh.needPingWatcher {
-                       log.Infof("received 'Ping' message '%s' from 
watcher[%s], no longer send 'Ping' to it, subject: %s, group: %s",
-                               message, remoteAddr, wh.watcher.Subject(), 
wh.watcher.Group())
+               if wh.needPing {
+                       log.Infof("received 'Ping' message '%s' from 
subscriber[%s], no longer send 'Ping' to it, consumer: %s",
+                               message, remoteAddr, wh.ConsumerID)
                }
-               wh.needPingWatcher = false
+               wh.needPing = false
                return wh.WritePingPong(websocket.PongMessage)
        })
        // PONG
-       wh.conn.SetPongHandler(func(message string) error {
+       wh.Conn.SetPongHandler(func(message string) error {
                defer func() {
-                       err := 
wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
+                       err := 
wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
                        if err != nil {
                                log.Error("", err)
                        }
                }()
-               log.Debugf("received 'Pong' message '%s' from watcher[%s], 
subject: %s, group: %s",
-                       message, remoteAddr, wh.watcher.Subject(), 
wh.watcher.Group())
+               log.Debugf("received 'Pong' message '%s' from subscriber[%s], 
consumer: %s",
+                       message, remoteAddr, wh.ConsumerID)
                return nil
        })
        // CLOSE
-       wh.conn.SetCloseHandler(func(code int, text string) error {
-               log.Infof("watcher[%s] active closed, code: %d, message: '%s', 
subject: %s, group: %s",
-                       remoteAddr, code, text, wh.watcher.Subject(), 
wh.watcher.Group())
+       wh.Conn.SetCloseHandler(func(code int, text string) error {
+               log.Infof("subscriber[%s] active closed, code: %d, message: 
'%s', consumer: %s",
+                       remoteAddr, code, text, wh.ConsumerID)
                return wh.sendClose(code, text)
        })
+}
 
-       wh.conn.SetReadLimit(connection.ReadMaxBody)
-       err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
+func (wh *WebSocket) ReadMessage() error {
+       wh.Conn.SetReadLimit(connection.ReadMaxBody)
+       err := wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
        if err != nil {
                log.Error("", err)
        }
        for {
-               _, _, err := wh.conn.ReadMessage()
+               _, _, err := wh.Conn.ReadMessage()
                if err != nil {
-                       // client close or conn broken
-                       wh.watcher.SetError(err)
-                       return
+                       return err
                }
        }
 }
 
 func (wh *WebSocket) sendClose(code int, text string) error {
-       remoteAddr := wh.conn.RemoteAddr().String()
+       remoteAddr := wh.Conn.RemoteAddr().String()
        var message []byte
        if code != websocket.CloseNoStatusReceived {
                message = websocket.FormatCloseMessage(code, text)
        }
-       err := wh.conn.WriteControl(websocket.CloseMessage, message, 
time.Now().Add(wh.SendTimeout))
+       err := wh.Conn.WriteControl(websocket.CloseMessage, message, 
time.Now().Add(wh.SendTimeout))
        if err != nil {
-               log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: 
%s",
-                       remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
+               log.Errorf(err, "subscriber[%s] catch an err, consumer: %s",
+                       remoteAddr, wh.ConsumerID)
                return err
        }
        return nil
 }
 
-// Pick will be called by runner
-func (wh *WebSocket) Pick() interface{} {
+// NeedCheck will be called by checker
+func (wh *WebSocket) NeedCheck() interface{} {
        select {
-       case <-wh.Ready():
-               if wh.watcher.Err() != nil {
-                       return wh.watcher.Err()
-               }
-
+       case <-wh.Idle():
                select {
                case t := <-wh.ticker.C:
                        return t
-               case j := <-wh.watcher.Job:
-                       if j == nil {
-                               return fmt.Errorf("server shutdown")
-                       }
-                       return j
                default:
-                       // reset if idle
-                       wh.SetReady()
+                       // reset if idleCh
+                       wh.SetIdle()
                }
        default:
        }
-
        return nil
 }
 
-// HandleEvent will be called if Pick() returns not nil
-func (wh *WebSocket) HandleEvent(o interface{}) {
-       defer wh.SetReady()
-
-       var remoteAddr = wh.conn.RemoteAddr().String()
-       switch o := o.(type) {
-       case error:
-               log.Errorf(o, "watcher[%s] catch an err, subject: %s, group: 
%s",
-                       remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-               _ = wh.write(util.StringToBytesWithNoCopy(fmt.Sprintf("watcher 
catch an err: %s", o.Error())))
-       case time.Time:
-               wh.Keepalive()
-       case *event.InstanceEvent:
-               wh.WriteInstanceEvent(o)
-       default:
-               log.Errorf(nil, "watcher[%s] unknown input %v, subject: %s, 
group: %s",
-                       remoteAddr, o, wh.watcher.Subject(), wh.watcher.Group())
-       }
-}
+// CheckHealth will be called if NeedCheck() returns not nil
+func (wh *WebSocket) CheckHealth(ctx context.Context) error {
+       defer wh.SetIdle()
 
-func (wh *WebSocket) Keepalive() {
-       domainProject := util.ParseDomainProject(wh.ctx)
-       if !serviceUtil.ServiceExist(wh.ctx, domainProject, wh.watcher.Group()) 
{
-               _ = wh.write(util.StringToBytesWithNoCopy("Service does not 
exit."))
-               return
+       if !wh.needPing {
+               return nil
        }
 
-       if !wh.needPingWatcher {
-               return
+       if !serviceUtil.ServiceExist(ctx, wh.DomainProject, wh.ConsumerID) {
+               return fmt.Errorf("Service does not exist.")
        }
 
-       remoteAddr := wh.conn.RemoteAddr().String()
+       remoteAddr := wh.Conn.RemoteAddr().String()
        if err := wh.WritePingPong(websocket.PingMessage); err != nil {
-               log.Errorf(err, "send 'Ping' message to watcher[%s] failed, 
subject: %s, group: %s",
-                       remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-               return
+               log.Errorf(err, "send 'Ping' message to subscriber[%s] failed, 
consumer: %s",
+                       remoteAddr, wh.ConsumerID)
+               return err
        }
 
-       log.Debugf("send 'Ping' message to watcher[%s], subject: %s, group: %s",
-               remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-       return
+       log.Debugf("send 'Ping' message to subscriber[%s], consumer: %s",
+               remoteAddr, wh.ConsumerID)
+       return nil
 }
 
 func (wh *WebSocket) WritePingPong(messageType int) error {
-       err := wh.conn.WriteControl(messageType, []byte{}, 
time.Now().Add(wh.SendTimeout))
+       err := wh.Conn.WriteControl(messageType, []byte{}, 
time.Now().Add(wh.SendTimeout))
        if err != nil {
                messageTypeName := "Ping"
                if messageType == websocket.PongMessage {
                        messageTypeName = "Pong"
                }
-               log.Errorf(err, "fail to send '%s' to watcher[%s], subject: %s, 
group: %s",
-                       messageTypeName, wh.conn.RemoteAddr(), 
wh.watcher.Subject(), wh.watcher.Group())
-               //wh.watcher.SetError(err)
+               log.Errorf(err, "fail to send '%s' to subscriber[%s], consumer: 
%s",
+                       messageTypeName, wh.Conn.RemoteAddr(), wh.ConsumerID)
+               //wh.subscriber.SetError(err)
                return err
        }
        return nil
 }
 
-func (wh *WebSocket) WriteInstanceEvent(evt *event.InstanceEvent) {
-       resp := evt.Response
-       providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, 
resp.Key.ServiceName, resp.Key.Version)
-       if resp.Action != string(pb.EVT_EXPIRE) {
-               providerFlag = fmt.Sprintf("%s/%s(%s)", 
resp.Instance.ServiceId, resp.Instance.InstanceId, providerFlag)
-       }
-       remoteAddr := wh.conn.RemoteAddr().String()
-       log.Infof("event[%s] is coming in, watcher[%s] watch %s, subject: %s, 
group: %s",
-               resp.Action, remoteAddr, providerFlag, wh.watcher.Subject(), 
wh.watcher.Group())
-
-       resp.Response = nil
-       data, err := json.Marshal(resp)
-       if err != nil {
-               log.Errorf(err, "watcher[%s] watch %s, subject: %s, group: %s",
-                       remoteAddr, providerFlag, evt, wh.watcher.Subject(), 
wh.watcher.Group())
-               data = util.StringToBytesWithNoCopy(fmt.Sprintf("marshal output 
file error, %s", err.Error()))
-       }
-       err = wh.write(data)
-       connection.ReportPublishCompleted(evt, err)
-}
-
-func (wh *WebSocket) write(message []byte) error {
-       select {
-       case <-wh.closed:
-               return nil
-       default:
-       }
-
-       err := wh.conn.SetWriteDeadline(time.Now().Add(wh.SendTimeout))
+func (wh *WebSocket) WriteTextMessage(message []byte) error {
+       err := wh.Conn.SetWriteDeadline(time.Now().Add(wh.SendTimeout))
        if err != nil {
                return err
        }
-       err = wh.conn.WriteMessage(websocket.TextMessage, message)
+       err = wh.Conn.WriteMessage(websocket.TextMessage, message)
        if err != nil {
-               log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: 
%s",
-                       wh.conn.RemoteAddr().String(), wh.watcher.Subject(), 
wh.watcher.Group())
+               log.Errorf(err, "subscriber[%s] catch an err, msg size: %d",
+                       wh.Conn.RemoteAddr().String(), len(message))
        }
        return err
 }
 
-func (wh *WebSocket) Ready() <-chan struct{} {
-       return wh.free
+func (wh *WebSocket) Idle() <-chan struct{} {
+       return wh.idleCh
 }
 
-func (wh *WebSocket) SetReady() {
+func (wh *WebSocket) SetIdle() {
        select {
-       case wh.free <- struct{}{}:
+       case wh.idleCh <- struct{}{}:
        default:
        }
 }
 
-func (wh *WebSocket) Stop() {
-       close(wh.closed)
-}
-
-func New(ctx context.Context, conn *websocket.Conn, watcher 
*event.InstanceSubscriber) *WebSocket {
-       return &WebSocket{
-               Options: ToOptions(),
-               ctx:     ctx,
-               conn:    conn,
-               watcher: watcher,
+func NewWebSocket(domainProject, serviceID string, conn *websocket.Conn) 
*WebSocket {
+       ws := &WebSocket{
+               Options:       ToOptions(),
+               DomainProject: domainProject,
+               ConsumerID:    serviceID,
+               Conn:          conn,
        }
+       ws.Init()
+       return ws
 }
diff --git a/server/connection/ws/websocket_test.go 
b/server/connection/ws/websocket_test.go
index 8338b8f..ca40bd6 100644
--- a/server/connection/ws/websocket_test.go
+++ b/server/connection/ws/websocket_test.go
@@ -16,118 +16,149 @@
  */
 package ws_test
 
-// initialize
-import _ "github.com/apache/servicecomb-service-center/test"
 import (
+       _ "github.com/apache/servicecomb-service-center/test"
+
        "context"
-       "errors"
-       "github.com/apache/servicecomb-service-center/pkg/registry"
-       wss "github.com/apache/servicecomb-service-center/server/connection/ws"
-       "github.com/apache/servicecomb-service-center/server/core"
-       "github.com/apache/servicecomb-service-center/server/core/proto"
-       "github.com/apache/servicecomb-service-center/server/event"
-       "github.com/gorilla/websocket"
        "net/http"
        "net/http/httptest"
        "strings"
        "testing"
        "time"
+
+       wss "github.com/apache/servicecomb-service-center/server/connection/ws"
+       "github.com/apache/servicecomb-service-center/server/core"
+       "github.com/apache/servicecomb-service-center/server/event"
+       "github.com/gorilla/websocket"
+       "github.com/stretchr/testify/assert"
 )
 
 var closeCh = make(chan struct{})
 
-type watcherConn struct {
-}
-
 func init() {
        testing.Init()
        core.Initialize()
 }
+
+type watcherConn struct {
+       ClientConn *websocket.Conn
+       ServerConn *websocket.Conn
+}
+
+func (h *watcherConn) Test() {
+       s := httptest.NewServer(h)
+       h.ClientConn, _, _ = websocket.DefaultDialer.Dial(
+               strings.Replace(s.URL, "http://";, "ws://", 1), nil)
+}
+
 func (h *watcherConn) ServeHTTP(w http.ResponseWriter, r *http.Request) {
        var upgrader = websocket.Upgrader{}
-       conn, _ := upgrader.Upgrade(w, r, nil)
+       h.ServerConn, _ = upgrader.Upgrade(w, r, nil)
        for {
-               conn.WriteControl(websocket.PingMessage, []byte{}, 
time.Now().Add(time.Second))
-               conn.WriteControl(websocket.PongMessage, []byte{}, 
time.Now().Add(time.Second))
-               _, _, err := conn.ReadMessage()
+               //h.ServerConn.WriteControl(websocket.PingMessage, []byte{}, 
time.Now().Add(time.Second))
+               //h.ServerConn.WriteControl(websocket.PongMessage, []byte{}, 
time.Now().Add(time.Second))
+               _, _, err := h.ServerConn.ReadMessage()
                if err != nil {
                        return
                }
                <-closeCh
-               conn.WriteControl(websocket.CloseMessage, []byte{}, 
time.Now().Add(time.Second))
-               conn.Close()
+               h.ServerConn.WriteControl(websocket.CloseMessage, []byte{}, 
time.Now().Add(time.Second))
+               h.ServerConn.Close()
                return
        }
 }
 
-func TestDoWebSocketListAndWatch(t *testing.T) {
-       s := httptest.NewServer(&watcherConn{})
-
-       conn, _, _ := websocket.DefaultDialer.Dial(
-               strings.Replace(s.URL, "http://";, "ws://", 1), nil)
-
-       wss.SendEstablishError(conn, errors.New("error"))
+func NewTest() *watcherConn {
+       ts := &watcherConn{}
+       ts.Test()
+       return ts
+}
 
-       w := event.NewInstanceSubscriber("g", "s", func() (results 
[]*registry.WatchInstanceResponse, rev int64) {
-               results = append(results, &registry.WatchInstanceResponse{
-                       Response: proto.CreateResponse(proto.Response_SUCCESS, 
"ok"),
-                       Action:   string(registry.EVT_CREATE),
-                       Key:      &registry.MicroServiceKey{},
-                       Instance: &registry.MicroServiceInstance{},
-               })
-               return
+func TestNewWebSocket(t *testing.T) {
+       mock := NewTest()
+       t.Run("should return not nil when new", func(t *testing.T) {
+               assert.NotNil(t, wss.NewWebSocket("", "", mock.ServerConn))
        })
+}
 
-       ws := wss.New(context.Background(), conn, w)
-       err := ws.Init()
-       if err != nil {
-               t.Fatalf("TestPublisher_Run")
+func TestWebSocket_NeedCheck(t *testing.T) {
+       mock := NewTest()
+       conn := mock.ServerConn
+       options := wss.ToOptions()
+       webSocket := &wss.WebSocket{
+               Options:       options,
+               DomainProject: "default",
+               ConsumerID:    "",
+               Conn:          conn,
        }
 
-       event.Center().Start()
-
-       go func() {
-               wss.ListAndWatch(context.Background(), "", nil, conn)
-
-               w2 := event.NewInstanceSubscriber("g", "s", func() (results 
[]*registry.WatchInstanceResponse, rev int64) {
-                       return
-               })
-               ws2 := wss.New(context.Background(), conn, w2)
-               err := ws2.Init()
-               if err != nil {
-                       t.Fatalf("TestPublisher_Run")
-               }
-       }()
-
-       go ws.HandleControlMessage()
-
-       w.OnMessage(nil)
-       w.OnMessage(&event.InstanceEvent{})
-
-       event.Center().Fire(event.NewInstanceEvent("g", "s", 1, 
&registry.WatchInstanceResponse{
-               Response: proto.CreateResponse(proto.Response_SUCCESS, "ok"),
-               Action:   string(registry.EVT_CREATE),
-               Key:      &registry.MicroServiceKey{},
-               Instance: &registry.MicroServiceInstance{},
-       }))
-
-       <-time.After(time.Second)
-
-       ws.HandleEvent(nil)
-
-       ws.WritePingPong(websocket.PingMessage)
-       ws.WritePingPong(websocket.PongMessage)
-
-       ws.HandleEvent(time.Now())
+       t.Run("should not check when new", func(t *testing.T) {
+               webSocket.HealthInterval = time.Second
+               webSocket.Init()
+               assert.Nil(t, webSocket.NeedCheck())
+       })
 
-       closeCh <- struct{}{}
+       t.Run("should check when check time up", func(t *testing.T) {
+               webSocket.HealthInterval = time.Microsecond
+               webSocket.Init()
+               <-time.After(time.Microsecond)
+               assert.NotNil(t, webSocket.NeedCheck())
+       })
+       t.Run("should not check when busy", func(t *testing.T) {
+               webSocket.HealthInterval = time.Microsecond
+               webSocket.Init()
+               <-time.After(time.Microsecond)
+               assert.NotNil(t, webSocket.NeedCheck())
+               assert.Nil(t, webSocket.NeedCheck())
+       })
+}
 
-       <-time.After(time.Second)
+func TestWebSocket_Idle(t *testing.T) {
+       mock := NewTest()
+       webSocket := wss.NewWebSocket("", "", mock.ServerConn)
 
-       ws.WritePingPong(websocket.PingMessage)
-       ws.WritePingPong(websocket.PongMessage)
+       t.Run("should idle when new", func(t *testing.T) {
+               select {
+               case <-webSocket.Idle():
+               default:
+                       assert.Fail(t, "not idle")
+               }
+       })
+       t.Run("should idle when setIdle", func(t *testing.T) {
+               select {
+               case <-webSocket.Idle():
+                       assert.Fail(t, "idle")
+               default:
+                       webSocket.SetIdle()
+                       select {
+                       case <-webSocket.Idle():
+                       default:
+                               assert.Fail(t, "not idle")
+                       }
+               }
+       })
+       t.Run("should idle when checkHealth", func(t *testing.T) {
+               _ = webSocket.CheckHealth(context.Background())
+               select {
+               case <-webSocket.Idle():
+               default:
+                       assert.Fail(t, "not idle")
+               }
+       })
+}
 
-       w.OnMessage(nil)
+func TestWebSocket_CheckHealth(t *testing.T) {
+       mock := NewTest()
+       event.Center().Start()
 
-       wss.Runner().Stop()
+       t.Run("should do nothing when recv PING", func(t *testing.T) {
+               ws := wss.NewWebSocket("", "", mock.ServerConn)
+               mock.ClientConn.WriteControl(websocket.PingMessage, []byte{}, 
time.Now().Add(time.Second))
+               <-time.After(time.Second)
+               assert.Nil(t, ws.CheckHealth(context.Background()))
+       })
+       t.Run("should return err when consumer not exist", func(t *testing.T) {
+               ws := wss.NewWebSocket("", "", mock.ServerConn)
+               assert.Equal(t, "Service does not exist.", 
ws.CheckHealth(context.Background()).Error())
+       })
 }
diff --git a/server/core/proto/services.go b/server/core/proto/services.go
index 12f6af4..37d82a1 100644
--- a/server/core/proto/services.go
+++ b/server/core/proto/services.go
@@ -28,7 +28,6 @@ type ServiceInstanceCtrlServerEx interface {
        BatchFind(ctx context.Context, in *registry.BatchFindInstancesRequest) 
(*registry.BatchFindInstancesResponse, error)
 
        WebSocketWatch(ctx context.Context, in *registry.WatchInstanceRequest, 
conn *websocket.Conn)
-       WebSocketListAndWatch(ctx context.Context, in 
*registry.WatchInstanceRequest, conn *websocket.Conn)
 
        ClusterHealth(ctx context.Context) (*registry.GetInstancesResponse, 
error)
 }
diff --git a/server/event/instance_subscriber.go 
b/server/event/instance_subscriber.go
index c5fad5a..cb1895f 100644
--- a/server/event/instance_subscriber.go
+++ b/server/event/instance_subscriber.go
@@ -18,11 +18,8 @@
 package event
 
 import (
-       "context"
        "github.com/apache/servicecomb-service-center/pkg/event"
-       "github.com/apache/servicecomb-service-center/pkg/gopool"
        "github.com/apache/servicecomb-service-center/pkg/log"
-       pb "github.com/apache/servicecomb-service-center/pkg/registry"
        "time"
 )
 
@@ -30,10 +27,7 @@ const AddJobTimeout = 1 * time.Second
 
 type InstanceSubscriber struct {
        event.Subscriber
-       Job          chan *InstanceEvent
-       ListRevision int64
-       ListFunc     func() (results []*pb.WatchInstanceResponse, rev int64)
-       listCh       chan struct{}
+       Job chan *InstanceEvent
 }
 
 func (w *InstanceSubscriber) SetError(err error) {
@@ -50,19 +44,6 @@ func (w *InstanceSubscriber) OnAccept() {
                return
        }
        log.Debugf("accepted by event service, %s watcher %s %s", w.Type(), 
w.Group(), w.Subject())
-       gopool.Go(w.listAndPublishJobs)
-}
-
-func (w *InstanceSubscriber) listAndPublishJobs(_ context.Context) {
-       defer close(w.listCh)
-       if w.ListFunc == nil {
-               return
-       }
-       results, rev := w.ListFunc()
-       w.ListRevision = rev
-       for _, response := range results {
-               w.sendMessage(NewInstanceEvent(w.Group(), w.Subject(), 
w.ListRevision, response))
-       }
 }
 
 //被通知
@@ -75,26 +56,6 @@ func (w *InstanceSubscriber) OnMessage(job event.Event) {
        if !ok {
                return
        }
-
-       select {
-       case <-w.listCh:
-       default:
-               timer := time.NewTimer(w.Timeout())
-               select {
-               case <-w.listCh:
-                       timer.Stop()
-               case <-timer.C:
-                       log.Errorf(nil,
-                               "the %s listwatcher %s %s is not ready[over 
%s], send the event %v",
-                               w.Type(), w.Group(), w.Subject(), w.Timeout(), 
job)
-               }
-       }
-
-       if wJob.Revision <= w.ListRevision {
-               log.Warnf("unexpected event %s job is coming in, watcher %s %s, 
job is %v, current revision is %v",
-                       w.Type(), w.Group(), w.Subject(), job, w.ListRevision)
-               return
-       }
        w.sendMessage(wJob)
 }
 
@@ -123,13 +84,10 @@ func (w *InstanceSubscriber) Close() {
        close(w.Job)
 }
 
-func NewInstanceSubscriber(serviceID, domainProject string,
-       listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) 
*InstanceSubscriber {
+func NewInstanceSubscriber(serviceID, domainProject string) 
*InstanceSubscriber {
        watcher := &InstanceSubscriber{
                Subscriber: event.NewSubscriber(INSTANCE, domainProject, 
serviceID),
                Job:        make(chan *InstanceEvent, INSTANCE.QueueSize()),
-               ListFunc:   listFunc,
-               listCh:     make(chan struct{}),
        }
        return watcher
 }
diff --git a/server/rest/controller/v3/instance_watcher.go 
b/server/rest/controller/v3/instance_watcher.go
index 7395c4c..6d0690b 100644
--- a/server/rest/controller/v3/instance_watcher.go
+++ b/server/rest/controller/v3/instance_watcher.go
@@ -28,6 +28,5 @@ type WatchService struct {
 func (this *WatchService) URLPatterns() []rest.Route {
        return []rest.Route{
                {rest.HTTPMethodGet, 
"/registry/v3/microservices/:serviceId/watcher", this.Watch},
-               {rest.HTTPMethodGet, 
"/registry/v3/microservices/:serviceId/listwatcher", this.ListAndWatch},
        }
 }
diff --git a/server/rest/controller/v4/instance_watcher.go 
b/server/rest/controller/v4/instance_watcher.go
index 9c1b1e9..347603f 100644
--- a/server/rest/controller/v4/instance_watcher.go
+++ b/server/rest/controller/v4/instance_watcher.go
@@ -33,7 +33,6 @@ type WatchService struct {
 func (s *WatchService) URLPatterns() []rest.Route {
        return []rest.Route{
                {Method: rest.HTTPMethodGet, Path: 
"/v4/:project/registry/microservices/:serviceId/watcher", Func: s.Watch},
-               {Method: rest.HTTPMethodGet, Path: 
"/v4/:project/registry/microservices/:serviceId/listwatcher", Func: 
s.ListAndWatch},
        }
 }
 
@@ -62,16 +61,3 @@ func (s *WatchService) Watch(w http.ResponseWriter, r 
*http.Request) {
                SelfServiceId: r.URL.Query().Get(":serviceId"),
        }, conn)
 }
-
-func (s *WatchService) ListAndWatch(w http.ResponseWriter, r *http.Request) {
-       conn, err := upgrade(w, r)
-       if err != nil {
-               return
-       }
-       defer conn.Close()
-
-       r.Method = "WATCHLIST"
-       core.InstanceAPI.WebSocketListAndWatch(r.Context(), 
&pb.WatchInstanceRequest{
-               SelfServiceId: r.URL.Query().Get(":serviceId"),
-       }, conn)
-}
diff --git a/server/service/watch.go b/server/service/watch.go
index 2163f03..b12cf82 100644
--- a/server/service/watch.go
+++ b/server/service/watch.go
@@ -48,7 +48,7 @@ func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, 
stream proto.Servic
                return err
        }
 
-       return grpc.ListAndWatch(stream.Context(), in.SelfServiceId, nil, 
stream)
+       return grpc.Watch(stream.Context(), in.SelfServiceId, stream)
 }
 
 func (s *InstanceService) WebSocketWatch(ctx context.Context, in 
*pb.WatchInstanceRequest, conn *websocket.Conn) {
@@ -57,16 +57,5 @@ func (s *InstanceService) WebSocketWatch(ctx 
context.Context, in *pb.WatchInstan
                ws.SendEstablishError(conn, err)
                return
        }
-       ws.ListAndWatch(ctx, in.SelfServiceId, nil, conn)
-}
-
-func (s *InstanceService) WebSocketListAndWatch(ctx context.Context, in 
*pb.WatchInstanceRequest, conn *websocket.Conn) {
-       log.Infof("new a web socket list and watch with service[%s]", 
in.SelfServiceId)
-       if err := s.WatchPreOpera(ctx, in); err != nil {
-               ws.SendEstablishError(conn, err)
-               return
-       }
-       ws.ListAndWatch(ctx, in.SelfServiceId, func() 
([]*pb.WatchInstanceResponse, int64) {
-               return serviceUtil.QueryAllProvidersInstances(ctx, 
in.SelfServiceId)
-       }, conn)
+       ws.Watch(ctx, in.SelfServiceId, conn)
 }
diff --git a/server/service/watch_test.go b/server/service/watch_test.go
index fe2dfd1..64e3622 100644
--- a/server/service/watch_test.go
+++ b/server/service/watch_test.go
@@ -46,13 +46,6 @@ func TestInstanceService_WebSocketWatch(t *testing.T) {
        instanceResource.WebSocketWatch(context.Background(), 
&pb.WatchInstanceRequest{}, nil)
 }
 
-func TestInstanceService_WebSocketListAndWatch(t *testing.T) {
-       defer func() {
-               recover()
-       }()
-       instanceResource.WebSocketListAndWatch(context.Background(), 
&pb.WatchInstanceRequest{}, nil)
-}
-
 var _ = Describe("'Instance' service", func() {
        Describe("execute 'watch' operartion", func() {
                var (

Reply via email to