This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new cc0f684  SCB-2176 Refactor websocket (#986)
cc0f684 is described below

commit cc0f684449f6606c45c86d53508428a50eb32b84
Author: little-cui <[email protected]>
AuthorDate: Thu May 20 09:45:23 2021 +0800

    SCB-2176 Refactor websocket (#986)
    
    * SCB-2176 Refactor event bus
    
    * SCB-2176 Fix: UT failures
    
    * SCB-2176 Refactor websocket (#980)
    
    * SCB-2176 Refactor websocket
    
    * SCB-2176 Add UTs
    
    * SCB-2176 Fix: UT failures
    
    * SCB-2176 Fix: websocket health check failed
    
    * SCB-2176 Add benchmark for 10K connection
    
    * SCB-2176 Add benchmark for 10K connection
    
    * SCB-2176 Add event metrics
---
 datasource/etcd/event/instance_event_handler.go    |  29 +-
 datasource/etcd/sd/etcd/cacher_kv.go               |   1 +
 datasource/etcd/sd/metrics.go                      |  32 +++
 datasource/etcd/sd/servicecenter/common.go         |  30 +-
 datasource/etcd/util/microservice_util.go          |   7 -
 docs/user-guides/metrics.md                        |   4 +
 examples/infrastructures/docker/README.md          |   2 +-
 integration/apis.go                                |   1 -
 integration/health-metrics-grafana.json            | 174 ++++++++----
 integration/instances_test.go                      | 133 ++++++++-
 pkg/event/bus.go                                   |   6 +-
 pkg/event/subscriber.go                            |   5 +-
 pkg/proto/service_ex.go                            |   1 -
 pkg/util/context.go                                |  14 +-
 server/connection/grpc/stream.go                   |   7 +-
 server/connection/grpc/stream_test.go              |   4 +-
 server/connection/ws/broker.go                     |  81 ++++++
 .../connection/ws/broker_test.go                   |  28 +-
 server/connection/ws/common.go                     |  68 +++++
 .../connection/ws/common_test.go                   |  33 ++-
 .../ws/{publisher.go => health_check.go}           |  87 +++---
 .../ws/health_check_test.go}                       |  28 +-
 .../ws/options.go}                                 |  23 +-
 server/connection/ws/websocket.go                  | 307 +++++++--------------
 server/connection/ws/websocket_test.go             | 174 +++++++-----
 server/event/instance_event.go                     |  51 ++++
 server/event/instance_subscriber.go                | 131 +++------
 server/metrics/connection.go                       |  26 +-
 server/rest/controller/v3/instance_watcher.go      |   1 -
 server/rest/controller/v4/instance_watcher.go      |  14 -
 server/service/watch.go                            |  15 +-
 server/service/watch_test.go                       |   7 -
 32 files changed, 904 insertions(+), 620 deletions(-)

diff --git a/datasource/etcd/event/instance_event_handler.go 
b/datasource/etcd/event/instance_event_handler.go
index 7297b7c..b247c2a 100644
--- a/datasource/etcd/event/instance_event_handler.go
+++ b/datasource/etcd/event/instance_event_handler.go
@@ -22,8 +22,6 @@ import (
        "fmt"
        "strings"
 
-       pb "github.com/go-chassis/cari/discovery"
-
        "github.com/apache/servicecomb-service-center/datasource/etcd/cache"
        "github.com/apache/servicecomb-service-center/datasource/etcd/kv"
        "github.com/apache/servicecomb-service-center/datasource/etcd/path"
@@ -36,6 +34,7 @@ import (
        "github.com/apache/servicecomb-service-center/server/event"
        "github.com/apache/servicecomb-service-center/server/metrics"
        "github.com/apache/servicecomb-service-center/server/syncernotify"
+       pb "github.com/go-chassis/cari/discovery"
 )
 
 const (
@@ -65,12 +64,13 @@ func (h *InstanceEventHandler) OnEvent(evt sd.KvEvent) {
        domainName := domainProject[:idx]
        projectName := domainProject[idx+1:]
 
+       ctx := util.WithGlobal(util.WithCacheOnly(context.Background()))
+
        var count float64 = increaseOne
-       switch action {
-       case pb.EVT_INIT:
+       if action == pb.EVT_INIT {
                metrics.ReportInstances(domainName, count)
-               ms := serviceUtil.GetServiceFromCache(domainProject, providerID)
-               if ms == nil {
+               ms, err := serviceUtil.GetService(ctx, domainProject, 
providerID)
+               if err != nil {
                        log.Warnf("caught [%s] instance[%s/%s] event, endpoints 
%v, get cached provider's file failed",
                                action, providerID, providerInstanceID, 
instance.Endpoints)
                        return
@@ -78,11 +78,10 @@ func (h *InstanceEventHandler) OnEvent(evt sd.KvEvent) {
                frameworkName, frameworkVersion := getFramework(ms)
                metrics.ReportFramework(domainName, projectName, frameworkName, 
frameworkVersion, count)
                return
-       case pb.EVT_CREATE:
-               metrics.ReportInstances(domainName, count)
-       case pb.EVT_DELETE:
+       }
+
+       if action == pb.EVT_DELETE {
                count = decreaseOne
-               metrics.ReportInstances(domainName, count)
                if !core.IsDefaultDomainProject(domainProject) {
                        projectName := domainProject[idx+1:]
                        serviceUtil.RemandInstanceQuota(
@@ -91,9 +90,7 @@ func (h *InstanceEventHandler) OnEvent(evt sd.KvEvent) {
        }
 
        // 查询服务版本信息
-       ctx := util.WithGlobal(util.WithCacheOnly(context.Background()))
        ms, err := serviceUtil.GetService(ctx, domainProject, providerID)
-
        if err != nil {
                log.Error(fmt.Sprintf("caught [%s] instance[%s/%s] event, 
endpoints %v, get cached provider's file failed",
                        action, providerID, providerInstanceID, 
instance.Endpoints), err)
@@ -110,8 +107,11 @@ func (h *InstanceEventHandler) OnEvent(evt sd.KvEvent) {
                return
        }
 
-       frameworkName, frameworkVersion := getFramework(ms)
-       metrics.ReportFramework(domainName, projectName, frameworkName, 
frameworkVersion, count)
+       if action != pb.EVT_UPDATE {
+               frameworkName, frameworkVersion := getFramework(ms)
+               metrics.ReportInstances(domainName, count)
+               metrics.ReportFramework(domainName, projectName, frameworkName, 
frameworkVersion, count)
+       }
 
        log.Infof("caught [%s] service[%s][%s/%s/%s/%s] instance[%s] event, 
endpoints %v",
                action, providerID, ms.Environment, ms.AppId, ms.ServiceName, 
ms.Version,
@@ -146,7 +146,6 @@ func PublishInstanceEvent(evt sd.KvEvent, domainProject 
string, serviceKey *pb.M
                Instance: evt.KV.Value.(*pb.MicroServiceInstance),
        }
        for _, consumerID := range subscribers {
-               // TODO add超时怎么处理?
                evt := event.NewInstanceEventWithTime(consumerID, 
domainProject, evt.Revision, evt.CreateAt, response)
                err := event.Center().Fire(evt)
                if err != nil {
diff --git a/datasource/etcd/sd/etcd/cacher_kv.go 
b/datasource/etcd/sd/etcd/cacher_kv.go
index 11eb70a..4606fab 100644
--- a/datasource/etcd/sd/etcd/cacher_kv.go
+++ b/datasource/etcd/sd/etcd/cacher_kv.go
@@ -454,6 +454,7 @@ func (c *KvCacher) notify(evts []sd.KvEvent) {
        for _, evt := range evts {
                c.Cfg.OnEvent(evt)
        }
+       sd.ReportDispatchEventCompleted(c.Cfg.Key, evts)
 }
 
 func (c *KvCacher) doParse(src *sdcommon.Resource) (kv *sd.KeyValue) {
diff --git a/datasource/etcd/sd/metrics.go b/datasource/etcd/sd/metrics.go
index 82e38c0..f25e30b 100644
--- a/datasource/etcd/sd/metrics.go
+++ b/datasource/etcd/sd/metrics.go
@@ -43,6 +43,23 @@ var (
                        Help:       "Latency of backend events processing",
                        Objectives: metrics.Pxx,
                }, []string{"instance", "prefix"})
+
+       dispatchCounter = helper.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Namespace: metrics.FamilyName,
+                       Subsystem: "db",
+                       Name:      "dispatch_event_total",
+                       Help:      "Counter of backend events dispatch",
+               }, []string{"instance", "prefix"})
+
+       dispatchLatency = helper.NewSummaryVec(
+               prometheus.SummaryOpts{
+                       Namespace:  metrics.FamilyName,
+                       Subsystem:  "db",
+                       Name:       "dispatch_event_durations_microseconds",
+                       Help:       "Latency of backend events dispatch",
+                       Objectives: metrics.Pxx,
+               }, []string{"instance", "prefix"})
 )
 
 func ReportProcessEventCompleted(prefix string, evts []KvEvent) {
@@ -57,4 +74,19 @@ func ReportProcessEventCompleted(prefix string, evts 
[]KvEvent) {
                eventsLatency.WithLabelValues(instance, prefix).Observe(elapsed)
        }
        eventsCounter.WithLabelValues(instance, prefix).Add(l)
+       dispatchCounter.WithLabelValues(instance, prefix).Add(l)
+}
+
+func ReportDispatchEventCompleted(prefix string, evts []KvEvent) {
+       l := float64(len(evts))
+       if l == 0 {
+               return
+       }
+       instance := metrics.InstanceName()
+       now := time.Now()
+       for _, evt := range evts {
+               elapsed := float64(now.Sub(evt.CreateAt.Local()).Nanoseconds()) 
/ float64(time.Microsecond)
+               dispatchLatency.WithLabelValues(instance, 
prefix).Observe(elapsed)
+       }
+       dispatchCounter.WithLabelValues(instance, prefix).Add(-l)
 }
diff --git a/datasource/etcd/sd/servicecenter/common.go 
b/datasource/etcd/sd/servicecenter/common.go
index d59957f..a107db1 100644
--- a/datasource/etcd/sd/servicecenter/common.go
+++ b/datasource/etcd/sd/servicecenter/common.go
@@ -1,17 +1,19 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package servicecenter
 
diff --git a/datasource/etcd/util/microservice_util.go 
b/datasource/etcd/util/microservice_util.go
index d7c30ef..6c45a06 100644
--- a/datasource/etcd/util/microservice_util.go
+++ b/datasource/etcd/util/microservice_util.go
@@ -64,13 +64,6 @@ func GetService(ctx context.Context, domainProject string, 
serviceID string) (*p
        return serviceResp.Kvs[0].Value.(*pb.MicroService), nil
 }
 
-// GetServiceFromCache gets service from cache
-func GetServiceFromCache(domainProject string, serviceID string) 
*pb.MicroService {
-       ctx := util.WithGlobal(util.WithCacheOnly(context.Background()))
-       svc, _ := GetService(ctx, domainProject, serviceID)
-       return svc
-}
-
 func getServicesRawData(ctx context.Context, domainProject string) 
([]*sd.KeyValue, error) {
        key := path.GenerateServiceKey(domainProject, "")
        opts := append(FromContext(ctx),
diff --git a/docs/user-guides/metrics.md b/docs/user-guides/metrics.md
index e3ead16..f815756 100644
--- a/docs/user-guides/metrics.md
+++ b/docs/user-guides/metrics.md
@@ -11,6 +11,8 @@
 ### Pub/Sub
 1. notify_publish_total
 1. notify_publish_durations_microseconds
+1. notify_pending_total
+1. notify_pending_durations_microseconds
 1. notify_subscriber_total
 
 ### Meta
@@ -25,6 +27,8 @@
 ### Backend
 1. db_backend_event_total
 1. db_backend_event_durations_microseconds
+1. db_dispatch_event_total
+1. db_dispatch_event_durations_microseconds
 1. db_backend_operation_total
 1. db_backend_operation_durations_microseconds
 1. db_backend_total
diff --git a/examples/infrastructures/docker/README.md 
b/examples/infrastructures/docker/README.md
index 1955c64..90c3b72 100644
--- a/examples/infrastructures/docker/README.md
+++ b/examples/infrastructures/docker/README.md
@@ -5,7 +5,7 @@ A simple demo to deploy ServiceCenter in docker environment.
 ## Quick Start
 
 ```bash
-cd $PROJECT_ROOT/integration/docker
+cd examples/infrastructures/docker
 docker-compose up
 ```
 This will start up ServiceCenter listening on `:30100` for handling requests 
and Dashboard listening on `:30103`.
diff --git a/integration/apis.go b/integration/apis.go
index d263a6b..f8aa164 100644
--- a/integration/apis.go
+++ b/integration/apis.go
@@ -47,7 +47,6 @@ var UPDATEINSTANCEMETADATA = 
"/v4/default/registry/microservices/:serviceId/inst
 var UPDATEINSTANCESTATUS = 
"/v4/default/registry/microservices/:serviceId/instances/:instanceId/status"
 var INSTANCEHEARTBEAT = 
"/v4/default/registry/microservices/:serviceId/instances/:instanceId/heartbeat"
 var INSTANCEWATCHER = "/v4/default/registry/microservices/:serviceId/watcher"
-var INSTANCELISTWATCHER = 
"/v4/default/registry/microservices/:serviceId/listwatcher"
 
 //Governance API's
 var GETGOVERNANCESERVICEDETAILS = "/v4/default/govern/microservices/:serviceId"
diff --git a/integration/health-metrics-grafana.json 
b/integration/health-metrics-grafana.json
index f730b2c..84f963b 100644
--- a/integration/health-metrics-grafana.json
+++ b/integration/health-metrics-grafana.json
@@ -1,18 +1,3 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License
-
 {
   "__inputs": [
     {
@@ -2183,7 +2168,7 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": 
"sum(irate(service_center_db_backend_event_total{job=\"service-center\"}[1m])) 
by (instance,prefix)",
+          "expr": 
"sum(irate(service_center_db_dispatch_event_total{job=\"service-center\"}[1m])) 
by (instance,prefix)",
           "format": "time_series",
           "intervalFactor": 1,
           "legendFormat": "{{instance}}> {{prefix}}",
@@ -2193,7 +2178,7 @@
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Publish Events",
+      "title": "Produce Events",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2277,7 +2262,7 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": 
"max(avg_over_time(service_center_db_backend_event_durations_microseconds{job=\"service-center\"}[1m]))
 by (instance, prefix)",
+          "expr": 
"max(avg_over_time(service_center_db_dispatch_event_durations_microseconds{job=\"service-center\"}[1m]))
 by (instance, prefix)",
           "format": "time_series",
           "intervalFactor": 1,
           "legendFormat": "{{instance}}> {{prefix}}",
@@ -2287,7 +2272,7 @@
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Publish Events Latency",
+      "title": "Produce Events Latency",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2381,7 +2366,7 @@
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Subscribe Events",
+      "title": "Post Events",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2475,7 +2460,7 @@
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Subscribe Events Latency",
+      "title": "Post Events Latency",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2526,8 +2511,91 @@
         "x": 0,
         "y": 40
       },
+      "id": 48,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": false,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": 
"sum(service_center_notify_pending_total{job=\"service-center\"}) by 
(instance,source)",
+          "format": "time_series",
+          "intervalFactor": 1,
+          "refId": "A"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Pending Events",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_LOCAL}",
+      "fill": 1,
+      "gridPos": {
+        "h": 6,
+        "w": 6,
+        "x": 6,
+        "y": 40
+      },
       "height": "",
-      "id": 37,
+      "id": 43,
       "legend": {
         "alignAsTable": false,
         "avg": false,
@@ -2558,18 +2626,17 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": 
"sum(irate(service_center_db_backend_operation_total{job=\"service-center\"}[1m]))
 by (instance,operation)",
+          "expr": 
"service_center_notify_subscriber_total{job=\"service-center\"}",
           "format": "time_series",
-          "instant": false,
           "intervalFactor": 1,
-          "legendFormat": "{{instance}}> {{operation}}",
-          "refId": "A"
+          "legendFormat": "{{instance}}> {{domain}} {{scheme}}",
+          "refId": "B"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Backend Operations",
+      "title": "Subscribers",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2587,7 +2654,7 @@
       "yaxes": [
         {
           "decimals": 0,
-          "format": "ops",
+          "format": "none",
           "label": null,
           "logBase": 1,
           "max": null,
@@ -2618,25 +2685,25 @@
       "gridPos": {
         "h": 6,
         "w": 6,
-        "x": 6,
+        "x": 12,
         "y": 40
       },
       "height": "",
-      "id": 38,
+      "id": 37,
       "legend": {
         "alignAsTable": false,
-        "avg": true,
+        "avg": false,
         "current": false,
         "hideEmpty": true,
         "hideZero": true,
-        "max": true,
-        "min": true,
-        "rightSide": true,
+        "max": false,
+        "min": false,
+        "rightSide": false,
         "show": false,
         "sort": "avg",
         "sortDesc": true,
         "total": false,
-        "values": true
+        "values": false
       },
       "lines": true,
       "linewidth": 1,
@@ -2653,17 +2720,18 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": 
"max(avg_over_time(service_center_db_backend_operation_durations_microseconds{job=\"service-center\"}[1m]))
 by (instance,operation)",
+          "expr": 
"sum(irate(service_center_db_backend_operation_total{job=\"service-center\"}[1m]))
 by (instance,operation)",
           "format": "time_series",
+          "instant": false,
           "intervalFactor": 1,
           "legendFormat": "{{instance}}> {{operation}}",
-          "refId": "B"
+          "refId": "A"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Backend Operations Latency",
+      "title": "Backend Operations",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2680,7 +2748,8 @@
       },
       "yaxes": [
         {
-          "format": "µs",
+          "decimals": 0,
+          "format": "ops",
           "label": null,
           "logBase": 1,
           "max": null,
@@ -2711,25 +2780,25 @@
       "gridPos": {
         "h": 6,
         "w": 6,
-        "x": 12,
+        "x": 18,
         "y": 40
       },
       "height": "",
-      "id": 43,
+      "id": 38,
       "legend": {
         "alignAsTable": false,
-        "avg": false,
+        "avg": true,
         "current": false,
         "hideEmpty": true,
         "hideZero": true,
-        "max": false,
-        "min": false,
-        "rightSide": false,
+        "max": true,
+        "min": true,
+        "rightSide": true,
         "show": false,
         "sort": "avg",
         "sortDesc": true,
         "total": false,
-        "values": false
+        "values": true
       },
       "lines": true,
       "linewidth": 1,
@@ -2746,17 +2815,17 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": 
"service_center_notify_subscriber_total{job=\"service-center\"}",
+          "expr": 
"max(avg_over_time(service_center_db_backend_operation_durations_microseconds{job=\"service-center\"}[1m]))
 by (instance,operation)",
           "format": "time_series",
           "intervalFactor": 1,
-          "legendFormat": "{{instance}}> {{domain}} {{scheme}}",
+          "legendFormat": "{{instance}}> {{operation}}",
           "refId": "B"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Subscribers",
+      "title": "Backend Operations Latency",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2773,8 +2842,7 @@
       },
       "yaxes": [
         {
-          "decimals": 0,
-          "format": "none",
+          "format": "µs",
           "label": null,
           "logBase": 1,
           "max": null,
@@ -3434,7 +3502,7 @@
     "list": []
   },
   "time": {
-    "from": "now-5m",
+    "from": "now-30m",
     "to": "now"
   },
   "timepicker": {
@@ -3465,5 +3533,5 @@
   "timezone": "",
   "title": "ServiceCenter",
   "uid": "Zg6NoHGiz",
-  "version": 12
+  "version": 17
 }
\ No newline at end of file
diff --git a/integration/instances_test.go b/integration/instances_test.go
index d4fd6b0..1f24fc7 100644
--- a/integration/instances_test.go
+++ b/integration/instances_test.go
@@ -18,8 +18,13 @@ package integrationtest_test
 
 import (
        "encoding/json"
+       "fmt"
+       "github.com/go-chassis/cari/discovery"
+       "github.com/gorilla/websocket"
+       "github.com/stretchr/testify/assert"
        "net/http"
        "strings"
+       "sync"
 
        . "github.com/onsi/ginkgo"
        . "github.com/onsi/gomega"
@@ -601,18 +606,6 @@ var _ = Describe("MicroService Api Test", func() {
 
                                
Expect(resp.StatusCode).To(Equal(http.StatusBadRequest))
                        })
-                       It("Call the listwatcher API ", func() {
-                               //This api gives 400 bad request for the 
integration test
-                               // as integration test is not able to make ws 
connection
-                               url := strings.Replace(INSTANCELISTWATCHER, 
":serviceId", serviceId, 1)
-                               req, _ := http.NewRequest(GET, SCURL+url, nil)
-                               req.Header.Set("X-Domain-Name", "default")
-                               resp, err := scclient.Do(req)
-                               Expect(err).To(BeNil())
-                               defer resp.Body.Close()
-
-                               
Expect(resp.StatusCode).To(Equal(http.StatusBadRequest))
-                       })
                })
        })
 
@@ -705,3 +698,119 @@ func BenchmarkRegisterMicroServiceInstance(b *testing.B) {
                Expect(resp.StatusCode).To(Equal(http.StatusOK))
        }
 }
+
+func BenchmarkInstanceWatch(t *testing.B) {
+       scclient = insecurityConnection
+       var serviceId, instanceId string
+
+       t.Run("prepare data", func(t *testing.B) {
+               // service
+               serviceName := "testInstance" + strconv.Itoa(rand.Int())
+               servicemap := map[string]interface{}{
+                       "serviceName": serviceName,
+                       "appId":       "testApp",
+                       "version":     "1.0",
+               }
+               bodyParams := map[string]interface{}{
+                       "service": servicemap,
+               }
+               body, _ := json.Marshal(bodyParams)
+               bodyBuf := bytes.NewReader(body)
+               req, _ := http.NewRequest(POST, SCURL+REGISTERMICROSERVICE, 
bodyBuf)
+               req.Header.Set("X-Domain-Name", "default")
+               resp, err := scclient.Do(req)
+               assert.NoError(t, err)
+               assert.Equal(t, http.StatusOK, resp.StatusCode)
+               respbody, _ := ioutil.ReadAll(resp.Body)
+               serviceId = 
gojson.Json(string(respbody)).Get("serviceId").Tostring()
+               resp.Body.Close()
+
+               // instance
+               healthcheck := map[string]interface{}{
+                       "mode":     "push",
+                       "interval": 30000,
+                       "times":    20000,
+               }
+               instance := map[string]interface{}{
+                       "hostName":    "cse",
+                       "healthCheck": healthcheck,
+               }
+               bodyParams = map[string]interface{}{
+                       "instance": instance,
+               }
+               body, _ = json.Marshal(bodyParams)
+               bodyBuf = bytes.NewReader(body)
+               req, _ = http.NewRequest(POST, 
SCURL+strings.Replace(REGISTERINSTANCE, ":serviceId", serviceId, 1), bodyBuf)
+               req.Header.Set("X-Domain-Name", "default")
+               resp, err = scclient.Do(req)
+               assert.NoError(t, err)
+               respbody, _ = ioutil.ReadAll(resp.Body)
+               instanceId = 
gojson.Json(string(respbody)).Get("instanceId").Tostring()
+               resp.Body.Close()
+
+               req, _ = http.NewRequest(GET, 
SCURL+FINDINSTANCE+"?appId=testApp&serviceName="+serviceName+"&version=1.0", 
nil)
+               req.Header.Set("X-Domain-Name", "default")
+               req.Header.Set("X-ConsumerId", serviceId)
+               resp, err = scclient.Do(req)
+               assert.NoError(t, err)
+               resp.Body.Close()
+
+               assert.Equal(t, http.StatusOK, resp.StatusCode)
+       })
+
+       t.Run("test 10K connection", func(t *testing.B) {
+
+               const N, E = 2500, 2500
+               var okWg sync.WaitGroup
+               okWg.Add(N)
+
+               t.Run("new 10K connection", func(t *testing.B) {
+                       url := strings.ReplaceAll(strings.ReplaceAll(SCURL, 
"http://";, "ws://")+INSTANCEWATCHER, ":serviceId", serviceId)
+                       for i := 0; i < N; i++ {
+                               go func() {
+                                       conn, _, err := 
websocket.DefaultDialer.Dial(url, nil)
+                                       assert.NoError(t, err)
+                                       for {
+                                               _ = 
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
+                                               _, data, err := 
conn.ReadMessage()
+                                               if err != nil {
+                                                       okWg.Done()
+                                                       return
+                                               }
+
+                                               var response 
discovery.WatchInstanceResponse
+                                               _ = json.Unmarshal(data, 
&response)
+                                               instance := response.Instance
+                                               timestamp, _ := 
strconv.ParseInt(instance.ModTimestamp, 10, 64)
+                                               sub := 
time.Now().Sub(time.Unix(timestamp, 0))
+                                               
fmt.Println(instance.Properties["tag"], sub)
+                                       }
+                               }()
+                       }
+                       <-time.After(10 * time.Second)
+               })
+
+               t.Run("fire 10K event", func(t *testing.B) {
+                       for i := 0; i < E; i++ {
+                               propertiesInstance := map[string]interface{}{
+                                       "tag": strconv.Itoa(i),
+                               }
+                               bodyParams := map[string]interface{}{
+                                       "properties": propertiesInstance,
+                               }
+                               url := strings.Replace(UPDATEINSTANCEMETADATA, 
":serviceId", serviceId, 1)
+                               url = strings.Replace(url, ":instanceId", 
instanceId, 1)
+                               body, _ := json.Marshal(bodyParams)
+                               bodyBuf := bytes.NewReader(body)
+                               req, _ := http.NewRequest(UPDATE, SCURL+url, 
bodyBuf)
+                               req.Header.Set("X-Domain-Name", "default")
+                               resp, _ := scclient.Do(req)
+                               resp.Body.Close()
+                       }
+               })
+
+               t.Run("wait", func(t *testing.B) {
+                       okWg.Wait()
+               })
+       })
+}
diff --git a/pkg/event/bus.go b/pkg/event/bus.go
index a573381..4e7fe77 100644
--- a/pkg/event/bus.go
+++ b/pkg/event/bus.go
@@ -41,14 +41,14 @@ func (bus *Bus) Fire(evt Event) {
        bus.Add(queue.Task{Payload: evt})
 }
 
-func (bus *Bus) Handle(ctx context.Context, evt interface{}) {
-       bus.fireAtOnce(evt.(Event))
+func (bus *Bus) Handle(ctx context.Context, payload interface{}) {
+       bus.fireAtOnce(payload.(Event))
 }
 
 func (bus *Bus) fireAtOnce(evt Event) {
        if itf, ok := bus.subjects.Get(evt.Subject()); ok {
                itf.(*Poster).Post(evt)
-       }
+       } // else the evt will be discard
 }
 
 func (bus *Bus) Subjects(name string) *Poster {
diff --git a/pkg/event/subscriber.go b/pkg/event/subscriber.go
index b26060b..df771dd 100644
--- a/pkg/event/subscriber.go
+++ b/pkg/event/subscriber.go
@@ -31,12 +31,15 @@ type Subscriber interface {
        Bus() *BusService
        SetBus(*BusService)
 
+       // Err event bus remove subscriber automatically, if return not nil.
+       // Implement of OnMessage should call SetError when run exception
        Err() error
        SetError(err error)
 
        Close()
+       // OnAccept call when subscriber appended in event bus successfully
        OnAccept()
-       // The event bus will callback this function, so it must be non-blocked.
+       // OnMessage call when event bus fire a msg, it must be non-blocked
        OnMessage(Event)
 }
 
diff --git a/pkg/proto/service_ex.go b/pkg/proto/service_ex.go
index ce955d3..64c0b9e 100644
--- a/pkg/proto/service_ex.go
+++ b/pkg/proto/service_ex.go
@@ -30,7 +30,6 @@ type ServiceInstanceCtrlServerEx interface {
        BatchFind(ctx context.Context, in *discovery.BatchFindInstancesRequest) 
(*discovery.BatchFindInstancesResponse, error)
 
        WebSocketWatch(ctx context.Context, in *discovery.WatchInstanceRequest, 
conn *websocket.Conn)
-       WebSocketListAndWatch(ctx context.Context, in 
*discovery.WatchInstanceRequest, conn *websocket.Conn)
 
        ClusterHealth(ctx context.Context) (*discovery.GetInstancesResponse, 
error)
 }
diff --git a/pkg/util/context.go b/pkg/util/context.go
index b77bbc2..2f5b621 100644
--- a/pkg/util/context.go
+++ b/pkg/util/context.go
@@ -20,6 +20,7 @@ package util
 import (
        "context"
        "net/http"
+       "strings"
        "time"
 )
 
@@ -28,6 +29,7 @@ const (
        CtxProject       CtxKey = "project"
        CtxTargetDomain  CtxKey = "target-domain"
        CtxTargetProject CtxKey = "target-project"
+       SPLIT                   = "/"
 )
 
 type StringContext struct {
@@ -119,11 +121,11 @@ func SetRequestContext(r *http.Request, key CtxKey, val 
interface{}) *http.Reque
 }
 
 func ParseDomainProject(ctx context.Context) string {
-       return ParseDomain(ctx) + "/" + ParseProject(ctx)
+       return ParseDomain(ctx) + SPLIT + ParseProject(ctx)
 }
 
 func ParseTargetDomainProject(ctx context.Context) string {
-       return ParseTargetDomain(ctx) + "/" + ParseTargetProject(ctx)
+       return ParseTargetDomain(ctx) + SPLIT + ParseTargetProject(ctx)
 }
 
 func ParseDomain(ctx context.Context) string {
@@ -178,6 +180,14 @@ func SetDomainProject(ctx context.Context, domain string, 
project string) contex
        return SetProject(SetDomain(ctx, domain), project)
 }
 
+func SetDomainProjectString(ctx context.Context, domainProject string) 
context.Context {
+       arr := strings.Split(domainProject, SPLIT)
+       if len(arr) != 2 {
+               return ctx
+       }
+       return SetProject(SetDomain(ctx, arr[0]), arr[1])
+}
+
 func SetTargetDomainProject(ctx context.Context, domain string, project 
string) context.Context {
        return SetTargetProject(SetTargetDomain(ctx, domain), project)
 }
diff --git a/server/connection/grpc/stream.go b/server/connection/grpc/stream.go
index b74acc3..b786c8f 100644
--- a/server/connection/grpc/stream.go
+++ b/server/connection/grpc/stream.go
@@ -28,12 +28,11 @@ import (
        "github.com/apache/servicecomb-service-center/server/connection"
        "github.com/apache/servicecomb-service-center/server/event"
        "github.com/apache/servicecomb-service-center/server/metrics"
-       pb "github.com/go-chassis/cari/discovery"
 )
 
 const GRPC = "gRPC"
 
-func Handle(watcher *event.InstanceEventListWatcher, stream 
proto.ServiceInstanceCtrlWatchServer) (err error) {
+func Handle(watcher *event.InstanceSubscriber, stream 
proto.ServiceInstanceCtrlWatchServer) (err error) {
        timer := time.NewTimer(connection.HeartbeatInterval)
        defer timer.Stop()
        for {
@@ -69,10 +68,10 @@ func Handle(watcher *event.InstanceEventListWatcher, stream 
proto.ServiceInstanc
        }
 }
 
-func ListAndWatch(ctx context.Context, serviceID string, f func() 
([]*pb.WatchInstanceResponse, int64), stream 
proto.ServiceInstanceCtrlWatchServer) (err error) {
+func Watch(ctx context.Context, serviceID string, stream 
proto.ServiceInstanceCtrlWatchServer) (err error) {
        domainProject := util.ParseDomainProject(ctx)
        domain := util.ParseDomain(ctx)
-       watcher := event.NewInstanceEventListWatcher(serviceID, domainProject, 
f)
+       watcher := event.NewInstanceSubscriber(serviceID, domainProject)
        err = event.Center().AddSubscriber(watcher)
        if err != nil {
                return
diff --git a/server/connection/grpc/stream_test.go 
b/server/connection/grpc/stream_test.go
index 3381167..3bb7d78 100644
--- a/server/connection/grpc/stream_test.go
+++ b/server/connection/grpc/stream_test.go
@@ -42,7 +42,7 @@ func (x *grpcWatchServer) Context() context.Context {
 }
 
 func TestHandleWatchJob(t *testing.T) {
-       w := event.NewInstanceEventListWatcher("g", "s", nil)
+       w := event.NewInstanceSubscriber("g", "s")
        w.Job <- nil
        err := stream.Handle(w, &grpcWatchServer{})
        if err == nil {
@@ -55,6 +55,6 @@ func TestHandleWatchJob(t *testing.T) {
 
 func TestDoStreamListAndWatch(t *testing.T) {
        defer log.Recover()
-       err := stream.ListAndWatch(context.Background(), "s", nil, nil)
+       err := stream.Watch(context.Background(), "s", nil)
        t.Fatal("TestDoStreamListAndWatch failed", err)
 }
diff --git a/server/connection/ws/broker.go b/server/connection/ws/broker.go
new file mode 100644
index 0000000..1501945
--- /dev/null
+++ b/server/connection/ws/broker.go
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ws
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/apache/servicecomb-service-center/server/event"
+       "github.com/apache/servicecomb-service-center/server/metrics"
+       pb "github.com/go-chassis/cari/discovery"
+)
+
+var errChanClosed = fmt.Errorf("chan closed")
+
+type Broker struct {
+       consumer *WebSocket
+       producer *event.InstanceSubscriber
+}
+
+func (b *Broker) Listen(ctx context.Context) error {
+       for {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case instanceEvent, ok := <-b.producer.Job:
+                       if !ok {
+                               return errChanClosed
+                       }
+                       err := b.write(instanceEvent)
+                       if err != nil {
+                               return err
+                       }
+               }
+       }
+}
+func (b *Broker) write(evt *event.InstanceEvent) error {
+       resp := evt.Response
+       providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, 
resp.Key.ServiceName, resp.Key.Version)
+       if resp.Action != string(pb.EVT_EXPIRE) {
+               providerFlag = fmt.Sprintf("%s/%s(%s)", 
resp.Instance.ServiceId, resp.Instance.InstanceId, providerFlag)
+       }
+       remoteAddr := b.consumer.Conn.RemoteAddr().String()
+       log.Infof("event[%s] is coming in, subscriber[%s] watch %s, group: %s",
+               resp.Action, remoteAddr, providerFlag, b.producer.Group())
+
+       resp.Response = nil
+       data, err := json.Marshal(resp)
+       if err != nil {
+               log.Errorf(err, "subscriber[%s] watch %s, group: %s", 
remoteAddr, providerFlag, b.producer.Group())
+               data = util.StringToBytesWithNoCopy(fmt.Sprintf("marshal output 
file error, %s", err.Error()))
+       }
+       err = b.consumer.WriteTextMessage(data)
+       metrics.ReportPublishCompleted(evt, err)
+       return err
+}
+
+func NewBroker(ws *WebSocket, is *event.InstanceSubscriber) *Broker {
+       return &Broker{
+               consumer: ws,
+               producer: is,
+       }
+}
diff --git a/pkg/proto/service_ex.go b/server/connection/ws/broker_test.go
similarity index 55%
copy from pkg/proto/service_ex.go
copy to server/connection/ws/broker_test.go
index ce955d3..a3597c3 100644
--- a/pkg/proto/service_ex.go
+++ b/server/connection/ws/broker_test.go
@@ -15,22 +15,28 @@
  * limitations under the License.
  */
 
-package proto
+package ws_test
 
 import (
        "context"
-
-       "github.com/go-chassis/cari/discovery"
-       "github.com/gorilla/websocket"
+       "github.com/apache/servicecomb-service-center/server/connection/ws"
+       "github.com/apache/servicecomb-service-center/server/event"
+       "github.com/stretchr/testify/assert"
+       "testing"
 )
 
-type ServiceInstanceCtrlServerEx interface {
-       ServiceInstanceCtrlServer
-
-       BatchFind(ctx context.Context, in *discovery.BatchFindInstancesRequest) 
(*discovery.BatchFindInstancesResponse, error)
+func TestNewBroker(t *testing.T) {
+       t.Run("should not return nil when new broker", func(t *testing.T) {
+               assert.NotNil(t, ws.NewBroker(nil, nil))
 
-       WebSocketWatch(ctx context.Context, in *discovery.WatchInstanceRequest, 
conn *websocket.Conn)
-       WebSocketListAndWatch(ctx context.Context, in 
*discovery.WatchInstanceRequest, conn *websocket.Conn)
+       })
+}
 
-       ClusterHealth(ctx context.Context) (*discovery.GetInstancesResponse, 
error)
+func TestBroker_Listen(t *testing.T) {
+       t.Run("should return err when listen context cancelled", func(t 
*testing.T) {
+               broker := ws.NewBroker(nil, event.NewInstanceSubscriber("", ""))
+               ctx, cancel := context.WithCancel(context.Background())
+               cancel()
+               assert.Equal(t, context.Canceled, broker.Listen(ctx))
+       })
 }
diff --git a/server/connection/ws/common.go b/server/connection/ws/common.go
new file mode 100644
index 0000000..d9e709c
--- /dev/null
+++ b/server/connection/ws/common.go
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ws
+
+import (
+       "context"
+       "fmt"
+
+       "github.com/apache/servicecomb-service-center/pkg/gopool"
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/apache/servicecomb-service-center/server/event"
+       "github.com/apache/servicecomb-service-center/server/metrics"
+       "github.com/gorilla/websocket"
+)
+
+func Watch(ctx context.Context, serviceID string, conn *websocket.Conn) {
+       domainProject := util.ParseDomainProject(ctx)
+       domain := util.ParseDomain(ctx)
+
+       ws := NewWebSocket(domainProject, serviceID, conn)
+       HealthChecker().Accept(ws)
+
+       subscriber := event.NewInstanceSubscriber(serviceID, domainProject)
+       err := event.Center().AddSubscriber(subscriber)
+       if err != nil {
+               SendEstablishError(conn, err)
+               return
+       }
+
+       metrics.ReportSubscriber(domain, Websocket, 1)
+       defer metrics.ReportSubscriber(domain, Websocket, -1)
+
+       pool := gopool.New(ctx).Do(func(ctx context.Context) {
+               if err := NewBroker(ws, subscriber).Listen(ctx); err != nil {
+                       log.Error(fmt.Sprintf("[%s] listen service[%s] failed", 
conn.RemoteAddr(), serviceID), err)
+               }
+       })
+       defer pool.Done()
+
+       if err := ws.ReadMessage(); err != nil {
+               log.Error(fmt.Sprintf("read subscriber[%s][%s] message failed", 
serviceID, conn.RemoteAddr()), err)
+               subscriber.SetError(err)
+       }
+}
+
+func SendEstablishError(conn *websocket.Conn, err error) {
+       remoteAddr := conn.RemoteAddr().String()
+       log.Errorf(err, "establish[%s] websocket watch failed.", remoteAddr)
+       if err := conn.WriteMessage(websocket.TextMessage, 
util.StringToBytesWithNoCopy(err.Error())); err != nil {
+               log.Errorf(err, "establish[%s] websocket watch failed: write 
message failed.", remoteAddr)
+       }
+}
diff --git a/pkg/proto/service_ex.go b/server/connection/ws/common_test.go
similarity index 53%
copy from pkg/proto/service_ex.go
copy to server/connection/ws/common_test.go
index ce955d3..df8ae55 100644
--- a/pkg/proto/service_ex.go
+++ b/server/connection/ws/common_test.go
@@ -15,22 +15,33 @@
  * limitations under the License.
  */
 
-package proto
+package ws_test
 
 import (
        "context"
+       "errors"
+       "testing"
 
-       "github.com/go-chassis/cari/discovery"
-       "github.com/gorilla/websocket"
+       wss "github.com/apache/servicecomb-service-center/server/connection/ws"
+       "github.com/stretchr/testify/assert"
 )
 
-type ServiceInstanceCtrlServerEx interface {
-       ServiceInstanceCtrlServer
-
-       BatchFind(ctx context.Context, in *discovery.BatchFindInstancesRequest) 
(*discovery.BatchFindInstancesResponse, error)
-
-       WebSocketWatch(ctx context.Context, in *discovery.WatchInstanceRequest, 
conn *websocket.Conn)
-       WebSocketListAndWatch(ctx context.Context, in 
*discovery.WatchInstanceRequest, conn *websocket.Conn)
+func TestSendEstablishError(t *testing.T) {
+       mock := NewTest()
+       t.Run("should read the err when call", func(t *testing.T) {
+               wss.SendEstablishError(mock.ServerConn, errors.New("error"))
+               _, message, err := mock.ClientConn.ReadMessage()
+               assert.Nil(t, err)
+               assert.Equal(t, "error", string(message))
+       })
+}
 
-       ClusterHealth(ctx context.Context) (*discovery.GetInstancesResponse, 
error)
+func TestWatch(t *testing.T) {
+       t.Run("should return when ctx cancelled", func(t *testing.T) {
+               mock := NewTest()
+               mock.ServerConn.Close()
+               ctx, cancel := context.WithCancel(context.Background())
+               cancel()
+               wss.Watch(ctx, "", mock.ServerConn)
+       })
 }
diff --git a/server/connection/ws/publisher.go 
b/server/connection/ws/health_check.go
similarity index 57%
rename from server/connection/ws/publisher.go
rename to server/connection/ws/health_check.go
index 267ff24..aeb4f38 100644
--- a/server/connection/ws/publisher.go
+++ b/server/connection/ws/health_check.go
@@ -19,40 +19,36 @@ package ws
 
 import (
        "context"
+       "fmt"
        "sync"
        "time"
 
        "github.com/apache/servicecomb-service-center/pkg/gopool"
+       "github.com/apache/servicecomb-service-center/pkg/log"
 )
 
-var publisher *Publisher
+var checker *HealthCheck
 
 func init() {
-       publisher = NewPublisher()
-       publisher.Run()
+       checker = NewHealthCheck()
+       checker.Run()
 }
 
-type Publisher struct {
+type HealthCheck struct {
        wss       []*WebSocket
        lock      sync.Mutex
        goroutine *gopool.Pool
 }
 
-func (wh *Publisher) Run() {
-       gopool.Go(publisher.loop)
+func (wh *HealthCheck) Run() {
+       gopool.Go(checker.loop)
 }
 
-func (wh *Publisher) Stop() {
+func (wh *HealthCheck) Stop() {
        wh.goroutine.Close(true)
 }
 
-func (wh *Publisher) dispatch(ws *WebSocket, payload interface{}) {
-       wh.goroutine.Do(func(ctx context.Context) {
-               ws.HandleEvent(payload)
-       })
-}
-
-func (wh *Publisher) loop(ctx context.Context) {
+func (wh *HealthCheck) loop(ctx context.Context) {
        defer wh.Stop()
        ticker := time.NewTicker(500 * time.Millisecond)
        for {
@@ -61,49 +57,52 @@ func (wh *Publisher) loop(ctx context.Context) {
                        // server shutdown
                        return
                case <-ticker.C:
-                       var removes []int
-                       for i, ws := range wh.wss {
-                               if payload := ws.Pick(); payload != nil {
-                                       if _, ok := payload.(error); ok {
-                                               removes = append(removes, i)
-                                       }
-                                       wh.dispatch(ws, payload)
+                       for _, ws := range wh.wss {
+                               if t := ws.NeedCheck(); t == nil {
+                                       continue
                                }
+                               wh.check(ws)
                        }
-                       if len(removes) == 0 {
-                               continue
-                       }
-
-                       wh.lock.Lock()
-                       var (
-                               news []*WebSocket
-                               s    int
-                       )
-                       for _, e := range removes {
-                               news = append(news, wh.wss[s:e]...)
-                               s = e + 1
-                       }
-                       if s < len(wh.wss) {
-                               news = append(news, wh.wss[s:]...)
-                       }
-                       wh.wss = news
-                       wh.lock.Unlock()
                }
        }
 }
 
-func (wh *Publisher) Accept(ws *WebSocket) {
+func (wh *HealthCheck) check(ws *WebSocket) {
+       wh.goroutine.Do(func(ctx context.Context) {
+               if err := ws.CheckHealth(ctx); err != nil {
+                       wh.Remove(ws)
+                       log.Error(fmt.Sprintf("checker removed unhealth 
websocket[%s]", ws.RemoteAddr), err)
+               }
+       })
+}
+
+func (wh *HealthCheck) Accept(ws *WebSocket) int {
        wh.lock.Lock()
        wh.wss = append(wh.wss, ws)
+       n := len(wh.wss)
+       wh.lock.Unlock()
+       return n
+}
+
+func (wh *HealthCheck) Remove(ws *WebSocket) int {
+       wh.lock.Lock()
+       for i, t := range wh.wss {
+               if t == ws {
+                       wh.wss = append(wh.wss[0:i], wh.wss[i+1:]...)
+                       break
+               }
+       }
+       n := len(wh.wss)
        wh.lock.Unlock()
+       return n
 }
 
-func NewPublisher() *Publisher {
-       return &Publisher{
+func NewHealthCheck() *HealthCheck {
+       return &HealthCheck{
                goroutine: gopool.New(context.Background()),
        }
 }
 
-func Instance() *Publisher {
-       return publisher
+func HealthChecker() *HealthCheck {
+       return checker
 }
diff --git a/server/rest/controller/v3/instance_watcher.go 
b/server/connection/ws/health_check_test.go
similarity index 58%
copy from server/rest/controller/v3/instance_watcher.go
copy to server/connection/ws/health_check_test.go
index 7395c4c..7ab0578 100644
--- a/server/rest/controller/v3/instance_watcher.go
+++ b/server/connection/ws/health_check_test.go
@@ -14,20 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package v3
+
+package ws_test
 
 import (
-       "github.com/apache/servicecomb-service-center/pkg/rest"
-       "github.com/apache/servicecomb-service-center/server/rest/controller/v4"
+       "github.com/apache/servicecomb-service-center/server/connection/ws"
+       "github.com/stretchr/testify/assert"
+       "testing"
 )
 
-type WatchService struct {
-       v4.WatchService
+func TestNewHealthCheck(t *testing.T) {
+       t.Run("should not return nil when new", func(t *testing.T) {
+               assert.NotNil(t, ws.NewHealthCheck())
+       })
 }
 
-func (this *WatchService) URLPatterns() []rest.Route {
-       return []rest.Route{
-               {rest.HTTPMethodGet, 
"/registry/v3/microservices/:serviceId/watcher", this.Watch},
-               {rest.HTTPMethodGet, 
"/registry/v3/microservices/:serviceId/listwatcher", this.ListAndWatch},
-       }
+func TestHealthCheck_Run(t *testing.T) {
+       mock := NewTest()
+
+       t.Run("should return 1 when accept one ws", func(t *testing.T) {
+               check := ws.NewHealthCheck()
+               webSocket := ws.NewWebSocket("", "", mock.ServerConn)
+               assert.Equal(t, 1, check.Accept(webSocket))
+               assert.Equal(t, 0, check.Remove(webSocket))
+       })
 }
diff --git a/server/rest/controller/v3/instance_watcher.go 
b/server/connection/ws/options.go
similarity index 64%
copy from server/rest/controller/v3/instance_watcher.go
copy to server/connection/ws/options.go
index 7395c4c..3196623 100644
--- a/server/rest/controller/v3/instance_watcher.go
+++ b/server/connection/ws/options.go
@@ -14,20 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package v3
+
+package ws
 
 import (
-       "github.com/apache/servicecomb-service-center/pkg/rest"
-       "github.com/apache/servicecomb-service-center/server/rest/controller/v4"
+       "time"
+
+       "github.com/apache/servicecomb-service-center/server/connection"
 )
 
-type WatchService struct {
-       v4.WatchService
+type Options struct {
+       ReadTimeout    time.Duration
+       SendTimeout    time.Duration
+       HealthInterval time.Duration
 }
 
-func (this *WatchService) URLPatterns() []rest.Route {
-       return []rest.Route{
-               {rest.HTTPMethodGet, 
"/registry/v3/microservices/:serviceId/watcher", this.Watch},
-               {rest.HTTPMethodGet, 
"/registry/v3/microservices/:serviceId/listwatcher", this.ListAndWatch},
+func ToOptions() Options {
+       return Options{
+               ReadTimeout:    connection.ReadTimeout,
+               SendTimeout:    connection.SendTimeout,
+               HealthInterval: connection.HeartbeatInterval,
        }
 }
diff --git a/server/connection/ws/websocket.go 
b/server/connection/ws/websocket.go
index 57b064c..e9fe3c1 100644
--- a/server/connection/ws/websocket.go
+++ b/server/connection/ws/websocket.go
@@ -19,311 +19,190 @@ package ws
 
 import (
        "context"
-       "encoding/json"
        "fmt"
+       "github.com/apache/servicecomb-service-center/pkg/util"
        "time"
 
        "github.com/apache/servicecomb-service-center/datasource"
        "github.com/apache/servicecomb-service-center/pkg/log"
-       "github.com/apache/servicecomb-service-center/pkg/util"
        "github.com/apache/servicecomb-service-center/server/connection"
-       "github.com/apache/servicecomb-service-center/server/event"
-       "github.com/apache/servicecomb-service-center/server/metrics"
        pb "github.com/go-chassis/cari/discovery"
        "github.com/gorilla/websocket"
 )
 
 const Websocket = "Websocket"
 
-type WebSocket struct {
-       ctx    context.Context
-       ticker *time.Ticker
-       conn   *websocket.Conn
-       // watcher subscribe the notification service event
-       watcher         *event.InstanceEventListWatcher
-       needPingWatcher bool
-       free            chan struct{}
-       closed          chan struct{}
-}
-
-func (wh *WebSocket) Init() error {
-       wh.ticker = time.NewTicker(connection.HeartbeatInterval)
-       wh.needPingWatcher = true
-       wh.free = make(chan struct{}, 1)
-       wh.closed = make(chan struct{})
+var errServiceNotExist = fmt.Errorf("Service does not exist.")
 
-       wh.SetReady()
-
-       remoteAddr := wh.conn.RemoteAddr().String()
-
-       // put in notification service queue
-       if err := event.Center().AddSubscriber(wh.watcher); err != nil {
-               err = fmt.Errorf("establish[%s] websocket watch failed: notify 
service error, %s",
-                       remoteAddr, err.Error())
-               log.Errorf(nil, err.Error())
-
-               werr := wh.conn.WriteMessage(websocket.TextMessage, 
util.StringToBytesWithNoCopy(err.Error()))
-               if werr != nil {
-                       log.Errorf(werr, "establish[%s] websocket watch failed: 
write message failed.", remoteAddr)
-               }
-               return err
-       }
-
-       // put in publisher queue
-       Instance().Accept(wh)
+type WebSocket struct {
+       Options
+       Conn          *websocket.Conn
+       RemoteAddr    string
+       DomainProject string
+       ConsumerID    string
 
-       log.Debugf("start watching instance status, watcher[%s], subject: %s, 
group: %s",
-               remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-       return nil
+       ticker   *time.Ticker
+       needPing bool
+       idleCh   chan struct{}
 }
 
-func (wh *WebSocket) ReadTimeout() time.Duration {
-       return connection.ReadTimeout
-}
+func (wh *WebSocket) Init() {
+       wh.RemoteAddr = wh.Conn.RemoteAddr().String()
+       wh.ticker = time.NewTicker(wh.HealthInterval)
+       wh.needPing = true
+       wh.idleCh = make(chan struct{}, 1)
 
-func (wh *WebSocket) SendTimeout() time.Duration {
-       return connection.SendTimeout
-}
+       wh.registerMessageHandler()
 
-func (wh *WebSocket) Heartbeat(messageType int) error {
-       err := wh.conn.WriteControl(messageType, []byte{}, 
time.Now().Add(wh.SendTimeout()))
-       if err != nil {
-               messageTypeName := "Ping"
-               if messageType == websocket.PongMessage {
-                       messageTypeName = "Pong"
-               }
-               log.Errorf(err, "fail to send '%s' to watcher[%s], subject: %s, 
group: %s",
-                       messageTypeName, wh.conn.RemoteAddr(), 
wh.watcher.Subject(), wh.watcher.Group())
-               //wh.watcher.SetError(err)
-               return err
-       }
-       return nil
+       wh.SetIdle()
+
+       log.Debugf("start watching instance status, subscriber[%s], consumer: 
%s",
+               wh.RemoteAddr, wh.ConsumerID)
 }
 
-func (wh *WebSocket) HandleControlMessage() {
-       remoteAddr := wh.conn.RemoteAddr().String()
+func (wh *WebSocket) registerMessageHandler() {
+       remoteAddr := wh.RemoteAddr
        // PING
-       wh.conn.SetPingHandler(func(message string) error {
+       wh.Conn.SetPingHandler(func(message string) error {
                defer func() {
-                       err := 
wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
+                       err := 
wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
                        if err != nil {
                                log.Error("", err)
                        }
                }()
-               if wh.needPingWatcher {
-                       log.Infof("received 'Ping' message '%s' from 
watcher[%s], no longer send 'Ping' to it, subject: %s, group: %s",
-                               message, remoteAddr, wh.watcher.Subject(), 
wh.watcher.Group())
+               if wh.needPing {
+                       log.Infof("received 'Ping' message '%s' from 
subscriber[%s], no longer send 'Ping' to it, consumer: %s",
+                               message, remoteAddr, wh.ConsumerID)
                }
-               wh.needPingWatcher = false
-               return wh.Heartbeat(websocket.PongMessage)
+               wh.needPing = false
+               return wh.WritePingPong(websocket.PongMessage)
        })
        // PONG
-       wh.conn.SetPongHandler(func(message string) error {
+       wh.Conn.SetPongHandler(func(message string) error {
                defer func() {
-                       err := 
wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
+                       err := 
wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
                        if err != nil {
                                log.Error("", err)
                        }
                }()
-               log.Debugf("received 'Pong' message '%s' from watcher[%s], 
subject: %s, group: %s",
-                       message, remoteAddr, wh.watcher.Subject(), 
wh.watcher.Group())
+               log.Debugf("received 'Pong' message '%s' from subscriber[%s], 
consumer: %s",
+                       message, remoteAddr, wh.ConsumerID)
                return nil
        })
        // CLOSE
-       wh.conn.SetCloseHandler(func(code int, text string) error {
-               log.Infof("watcher[%s] active closed, code: %d, message: '%s', 
subject: %s, group: %s",
-                       remoteAddr, code, text, wh.watcher.Subject(), 
wh.watcher.Group())
+       wh.Conn.SetCloseHandler(func(code int, text string) error {
+               log.Infof("subscriber[%s] active closed, code: %d, message: 
'%s', consumer: %s",
+                       remoteAddr, code, text, wh.ConsumerID)
                return wh.sendClose(code, text)
        })
+}
 
-       wh.conn.SetReadLimit(connection.ReadMaxBody)
-       err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
+func (wh *WebSocket) ReadMessage() error {
+       wh.Conn.SetReadLimit(connection.ReadMaxBody)
+       err := wh.Conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout))
        if err != nil {
                log.Error("", err)
        }
        for {
-               _, _, err := wh.conn.ReadMessage()
+               _, _, err := wh.Conn.ReadMessage()
                if err != nil {
-                       // client close or conn broken
-                       wh.watcher.SetError(err)
-                       return
+                       return err
                }
        }
 }
 
 func (wh *WebSocket) sendClose(code int, text string) error {
-       remoteAddr := wh.conn.RemoteAddr().String()
+       remoteAddr := wh.Conn.RemoteAddr().String()
        var message []byte
        if code != websocket.CloseNoStatusReceived {
                message = websocket.FormatCloseMessage(code, text)
        }
-       err := wh.conn.WriteControl(websocket.CloseMessage, message, 
time.Now().Add(wh.SendTimeout()))
+       err := wh.Conn.WriteControl(websocket.CloseMessage, message, 
time.Now().Add(wh.SendTimeout))
        if err != nil {
-               log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: 
%s",
-                       remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
+               log.Errorf(err, "subscriber[%s] catch an err, consumer: %s",
+                       remoteAddr, wh.ConsumerID)
                return err
        }
        return nil
 }
 
-// Pick will be called by publisher
-func (wh *WebSocket) Pick() interface{} {
+// NeedCheck will be called by checker
+func (wh *WebSocket) NeedCheck() interface{} {
        select {
-       case <-wh.Ready():
-               if wh.watcher.Err() != nil {
-                       return wh.watcher.Err()
-               }
-
+       case <-wh.Idle():
                select {
                case t := <-wh.ticker.C:
                        return t
-               case j := <-wh.watcher.Job:
-                       if j == nil {
-                               return fmt.Errorf("server shutdown")
-                       }
-                       return j
                default:
-                       // reset if idle
-                       wh.SetReady()
+                       // reset if idleCh
+                       wh.SetIdle()
                }
        default:
        }
-
        return nil
 }
 
-// HandleEvent will be called if Pick() returns not nil
-func (wh *WebSocket) HandleEvent(o interface{}) {
-       defer wh.SetReady()
-
-       var (
-               message    []byte
-               remoteAddr = wh.conn.RemoteAddr().String()
-       )
-
-       switch o := o.(type) {
-       case error:
-               log.Errorf(o, "watcher[%s] catch an err, subject: %s, group: 
%s",
-                       remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-
-               message = util.StringToBytesWithNoCopy(fmt.Sprintf("watcher 
catch an err: %s", o.Error()))
-       case time.Time:
-               if exist, err := datasource.Instance().ExistServiceByID(wh.ctx, 
&pb.GetExistenceByIDRequest{
-                       ServiceId: wh.watcher.Group(),
-               }); err != nil || !exist.Exist {
-                       message = util.StringToBytesWithNoCopy("Service does 
not exit.")
-                       break
-               }
-
-               if !wh.needPingWatcher {
-                       return
-               }
+// CheckHealth will be called if NeedCheck() returns not nil
+func (wh *WebSocket) CheckHealth(ctx context.Context) error {
+       defer wh.SetIdle()
 
-               if err := wh.Heartbeat(websocket.PingMessage); err != nil {
-                       log.Errorf(err, "send 'Ping' message to watcher[%s] 
failed, subject: %s, group: %s",
-                               remoteAddr, wh.watcher.Subject(), 
wh.watcher.Group())
-                       return
-               }
-
-               log.Debugf("send 'Ping' message to watcher[%s], subject: %s, 
group: %s",
-                       remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-               return
-       case *event.InstanceEvent:
-               resp := o.Response
+       if !wh.needPing {
+               return nil
+       }
 
-               providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, 
resp.Key.ServiceName, resp.Key.Version)
-               if resp.Action != string(pb.EVT_EXPIRE) {
-                       providerFlag = fmt.Sprintf("%s/%s(%s)", 
resp.Instance.ServiceId, resp.Instance.InstanceId, providerFlag)
-               }
-               log.Infof("event[%s] is coming in, watcher[%s] watch %s, 
subject: %s, group: %s",
-                       resp.Action, remoteAddr, providerFlag, 
wh.watcher.Subject(), wh.watcher.Group())
+       ctx = util.SetDomainProjectString(ctx, wh.DomainProject)
 
-               resp.Response = nil
-               data, err := json.Marshal(resp)
-               if err != nil {
-                       log.Errorf(err, "watcher[%s] watch %s, subject: %s, 
group: %s",
-                               remoteAddr, providerFlag, o, 
wh.watcher.Subject(), wh.watcher.Group())
-                       message = 
util.StringToBytesWithNoCopy(fmt.Sprintf("marshal output file error, %s", 
err.Error()))
-                       break
-               }
-               message = data
-       default:
-               log.Errorf(nil, "watcher[%s] unknown input %v, subject: %s, 
group: %s",
-                       remoteAddr, o, wh.watcher.Subject(), wh.watcher.Group())
-               return
+       if exist, err := datasource.Instance().ExistServiceByID(ctx, 
&pb.GetExistenceByIDRequest{
+               ServiceId: wh.ConsumerID,
+       }); err != nil || !exist.Exist {
+               return errServiceNotExist
        }
 
-       select {
-       case <-wh.closed:
-               return
-       default:
+       remoteAddr := wh.Conn.RemoteAddr().String()
+       if err := wh.WritePingPong(websocket.PingMessage); err != nil {
+               return err
        }
 
-       err := wh.WriteMessage(message)
-       if evt, ok := o.(*event.InstanceEvent); ok {
-               metrics.ReportPublishCompleted(evt, err)
-       }
-       if err != nil {
-               log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: 
%s",
-                       remoteAddr, wh.watcher.Subject(), wh.watcher.Group())
-       }
+       log.Debugf("send 'Ping' message to subscriber[%s], consumer: %s",
+               remoteAddr, wh.ConsumerID)
+       return nil
+}
+
+func (wh *WebSocket) WritePingPong(messageType int) error {
+       return wh.Conn.WriteControl(messageType, []byte{}, 
time.Now().Add(wh.SendTimeout))
 }
 
-func (wh *WebSocket) WriteMessage(message []byte) error {
-       err := wh.conn.SetWriteDeadline(time.Now().Add(wh.SendTimeout()))
+func (wh *WebSocket) WriteTextMessage(message []byte) error {
+       err := wh.Conn.SetWriteDeadline(time.Now().Add(wh.SendTimeout))
        if err != nil {
                return err
        }
-       return wh.conn.WriteMessage(websocket.TextMessage, message)
+       err = wh.Conn.WriteMessage(websocket.TextMessage, message)
+       if err != nil {
+               log.Errorf(err, "subscriber[%s] catch an err, msg size: %d",
+                       wh.Conn.RemoteAddr().String(), len(message))
+       }
+       return err
 }
 
-func (wh *WebSocket) Ready() <-chan struct{} {
-       return wh.free
+func (wh *WebSocket) Idle() <-chan struct{} {
+       return wh.idleCh
 }
 
-func (wh *WebSocket) SetReady() {
+func (wh *WebSocket) SetIdle() {
        select {
-       case wh.free <- struct{}{}:
+       case wh.idleCh <- struct{}{}:
        default:
        }
 }
 
-func (wh *WebSocket) Stop() {
-       close(wh.closed)
-}
-
-func ListAndWatch(ctx context.Context, serviceID string, f func() 
([]*pb.WatchInstanceResponse, int64), conn *websocket.Conn) {
-       domainProject := util.ParseDomainProject(ctx)
-       domain := util.ParseDomain(ctx)
-       socket := New(ctx, conn, event.NewInstanceEventListWatcher(serviceID, 
domainProject, f))
-
-       metrics.ReportSubscriber(domain, Websocket, 1)
-       process(socket)
-       metrics.ReportSubscriber(domain, Websocket, -1)
-}
-
-func process(socket *WebSocket) {
-       if err := socket.Init(); err != nil {
-               return
-       }
-
-       socket.HandleControlMessage()
-
-       socket.Stop()
-}
-
-func SendEstablishError(conn *websocket.Conn, err error) {
-       remoteAddr := conn.RemoteAddr().String()
-       log.Errorf(err, "establish[%s] websocket watch failed.", remoteAddr)
-       if err := conn.WriteMessage(websocket.TextMessage, 
util.StringToBytesWithNoCopy(err.Error())); err != nil {
-               log.Errorf(err, "establish[%s] websocket watch failed: write 
message failed.", remoteAddr)
-       }
-}
-
-func New(ctx context.Context, conn *websocket.Conn, watcher 
*event.InstanceEventListWatcher) *WebSocket {
-       return &WebSocket{
-               ctx:     ctx,
-               conn:    conn,
-               watcher: watcher,
+func NewWebSocket(domainProject, serviceID string, conn *websocket.Conn) 
*WebSocket {
+       ws := &WebSocket{
+               Options:       ToOptions(),
+               DomainProject: domainProject,
+               ConsumerID:    serviceID,
+               Conn:          conn,
        }
+       ws.Init()
+       return ws
 }
diff --git a/server/connection/ws/websocket_test.go 
b/server/connection/ws/websocket_test.go
index 7f21727..ca40bd6 100644
--- a/server/connection/ws/websocket_test.go
+++ b/server/connection/ws/websocket_test.go
@@ -16,12 +16,10 @@
  */
 package ws_test
 
-// initialize
 import (
        _ "github.com/apache/servicecomb-service-center/test"
 
        "context"
-       "errors"
        "net/http"
        "net/http/httptest"
        "strings"
@@ -31,104 +29,136 @@ import (
        wss "github.com/apache/servicecomb-service-center/server/connection/ws"
        "github.com/apache/servicecomb-service-center/server/core"
        "github.com/apache/servicecomb-service-center/server/event"
-       "github.com/go-chassis/cari/discovery"
        "github.com/gorilla/websocket"
+       "github.com/stretchr/testify/assert"
 )
 
 var closeCh = make(chan struct{})
 
-type watcherConn struct {
-}
-
 func init() {
        testing.Init()
        core.Initialize()
 }
+
+type watcherConn struct {
+       ClientConn *websocket.Conn
+       ServerConn *websocket.Conn
+}
+
+func (h *watcherConn) Test() {
+       s := httptest.NewServer(h)
+       h.ClientConn, _, _ = websocket.DefaultDialer.Dial(
+               strings.Replace(s.URL, "http://";, "ws://", 1), nil)
+}
+
 func (h *watcherConn) ServeHTTP(w http.ResponseWriter, r *http.Request) {
        var upgrader = websocket.Upgrader{}
-       conn, _ := upgrader.Upgrade(w, r, nil)
+       h.ServerConn, _ = upgrader.Upgrade(w, r, nil)
        for {
-               conn.WriteControl(websocket.PingMessage, []byte{}, 
time.Now().Add(time.Second))
-               conn.WriteControl(websocket.PongMessage, []byte{}, 
time.Now().Add(time.Second))
-               _, _, err := conn.ReadMessage()
+               //h.ServerConn.WriteControl(websocket.PingMessage, []byte{}, 
time.Now().Add(time.Second))
+               //h.ServerConn.WriteControl(websocket.PongMessage, []byte{}, 
time.Now().Add(time.Second))
+               _, _, err := h.ServerConn.ReadMessage()
                if err != nil {
                        return
                }
                <-closeCh
-               conn.WriteControl(websocket.CloseMessage, []byte{}, 
time.Now().Add(time.Second))
-               conn.Close()
+               h.ServerConn.WriteControl(websocket.CloseMessage, []byte{}, 
time.Now().Add(time.Second))
+               h.ServerConn.Close()
                return
        }
 }
 
-func TestDoWebSocketListAndWatch(t *testing.T) {
-       s := httptest.NewServer(&watcherConn{})
-
-       conn, _, _ := websocket.DefaultDialer.Dial(
-               strings.Replace(s.URL, "http://";, "ws://", 1), nil)
-
-       wss.SendEstablishError(conn, errors.New("error"))
+func NewTest() *watcherConn {
+       ts := &watcherConn{}
+       ts.Test()
+       return ts
+}
 
-       w := event.NewInstanceEventListWatcher("g", "s", func() (results 
[]*discovery.WatchInstanceResponse, rev int64) {
-               results = append(results, &discovery.WatchInstanceResponse{
-                       Response: 
discovery.CreateResponse(discovery.ResponseSuccess, "ok"),
-                       Action:   string(discovery.EVT_CREATE),
-                       Key:      &discovery.MicroServiceKey{},
-                       Instance: &discovery.MicroServiceInstance{},
-               })
-               return
+func TestNewWebSocket(t *testing.T) {
+       mock := NewTest()
+       t.Run("should return not nil when new", func(t *testing.T) {
+               assert.NotNil(t, wss.NewWebSocket("", "", mock.ServerConn))
        })
+}
 
-       ws := wss.New(context.Background(), conn, w)
-       err := ws.Init()
-       if err != nil {
-               t.Fatalf("TestPublisher_Run")
+func TestWebSocket_NeedCheck(t *testing.T) {
+       mock := NewTest()
+       conn := mock.ServerConn
+       options := wss.ToOptions()
+       webSocket := &wss.WebSocket{
+               Options:       options,
+               DomainProject: "default",
+               ConsumerID:    "",
+               Conn:          conn,
        }
 
-       event.Center().Start()
-
-       go func() {
-               wss.ListAndWatch(context.Background(), "", nil, conn)
-
-               w2 := event.NewInstanceEventListWatcher("g", "s", func() 
(results []*discovery.WatchInstanceResponse, rev int64) {
-                       return
-               })
-               ws2 := wss.New(context.Background(), conn, w2)
-               err := ws2.Init()
-               if err != nil {
-                       t.Fatalf("TestPublisher_Run")
-               }
-       }()
-
-       go ws.HandleControlMessage()
-
-       w.OnMessage(nil)
-       w.OnMessage(&event.InstanceEvent{})
-
-       event.Center().Fire(event.NewInstanceEvent("g", "s", 1, 
&discovery.WatchInstanceResponse{
-               Response: discovery.CreateResponse(discovery.ResponseSuccess, 
"ok"),
-               Action:   string(discovery.EVT_CREATE),
-               Key:      &discovery.MicroServiceKey{},
-               Instance: &discovery.MicroServiceInstance{},
-       }))
-
-       <-time.After(time.Second)
-
-       ws.HandleEvent(nil)
-
-       ws.Heartbeat(websocket.PingMessage)
-       ws.Heartbeat(websocket.PongMessage)
-
-       ws.HandleEvent(time.Now())
+       t.Run("should not check when new", func(t *testing.T) {
+               webSocket.HealthInterval = time.Second
+               webSocket.Init()
+               assert.Nil(t, webSocket.NeedCheck())
+       })
 
-       closeCh <- struct{}{}
+       t.Run("should check when check time up", func(t *testing.T) {
+               webSocket.HealthInterval = time.Microsecond
+               webSocket.Init()
+               <-time.After(time.Microsecond)
+               assert.NotNil(t, webSocket.NeedCheck())
+       })
+       t.Run("should not check when busy", func(t *testing.T) {
+               webSocket.HealthInterval = time.Microsecond
+               webSocket.Init()
+               <-time.After(time.Microsecond)
+               assert.NotNil(t, webSocket.NeedCheck())
+               assert.Nil(t, webSocket.NeedCheck())
+       })
+}
 
-       <-time.After(time.Second)
+func TestWebSocket_Idle(t *testing.T) {
+       mock := NewTest()
+       webSocket := wss.NewWebSocket("", "", mock.ServerConn)
 
-       ws.Heartbeat(websocket.PingMessage)
-       ws.Heartbeat(websocket.PongMessage)
+       t.Run("should idle when new", func(t *testing.T) {
+               select {
+               case <-webSocket.Idle():
+               default:
+                       assert.Fail(t, "not idle")
+               }
+       })
+       t.Run("should idle when setIdle", func(t *testing.T) {
+               select {
+               case <-webSocket.Idle():
+                       assert.Fail(t, "idle")
+               default:
+                       webSocket.SetIdle()
+                       select {
+                       case <-webSocket.Idle():
+                       default:
+                               assert.Fail(t, "not idle")
+                       }
+               }
+       })
+       t.Run("should idle when checkHealth", func(t *testing.T) {
+               _ = webSocket.CheckHealth(context.Background())
+               select {
+               case <-webSocket.Idle():
+               default:
+                       assert.Fail(t, "not idle")
+               }
+       })
+}
 
-       w.OnMessage(nil)
+func TestWebSocket_CheckHealth(t *testing.T) {
+       mock := NewTest()
+       event.Center().Start()
 
-       wss.Instance().Stop()
+       t.Run("should do nothing when recv PING", func(t *testing.T) {
+               ws := wss.NewWebSocket("", "", mock.ServerConn)
+               mock.ClientConn.WriteControl(websocket.PingMessage, []byte{}, 
time.Now().Add(time.Second))
+               <-time.After(time.Second)
+               assert.Nil(t, ws.CheckHealth(context.Background()))
+       })
+       t.Run("should return err when consumer not exist", func(t *testing.T) {
+               ws := wss.NewWebSocket("", "", mock.ServerConn)
+               assert.Equal(t, "Service does not exist.", 
ws.CheckHealth(context.Background()).Error())
+       })
 }
diff --git a/server/event/instance_event.go b/server/event/instance_event.go
new file mode 100644
index 0000000..74f30fc
--- /dev/null
+++ b/server/event/instance_event.go
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+       "github.com/apache/servicecomb-service-center/pkg/event"
+       simple "github.com/apache/servicecomb-service-center/pkg/time"
+       pb "github.com/go-chassis/cari/discovery"
+)
+
+const QueueSize = 5000
+
+var INSTANCE = event.RegisterType("INSTANCE", QueueSize)
+
+// 状态变化推送
+type InstanceEvent struct {
+       event.Event
+       Revision int64
+       Response *pb.WatchInstanceResponse
+}
+
+func NewInstanceEvent(serviceID, domainProject string, rev int64, response 
*pb.WatchInstanceResponse) *InstanceEvent {
+       return &InstanceEvent{
+               Event:    event.NewEvent(INSTANCE, domainProject, serviceID),
+               Revision: rev,
+               Response: response,
+       }
+}
+
+func NewInstanceEventWithTime(serviceID, domainProject string, rev int64, 
createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
+       return &InstanceEvent{
+               Event:    event.NewEventWithTime(INSTANCE, domainProject, 
serviceID, createAt),
+               Revision: rev,
+               Response: response,
+       }
+}
diff --git a/server/event/instance_subscriber.go 
b/server/event/instance_subscriber.go
index f61f622..9d5c1c4 100644
--- a/server/event/instance_subscriber.go
+++ b/server/event/instance_subscriber.go
@@ -18,39 +18,20 @@
 package event
 
 import (
-       "context"
-       "time"
-
+       "fmt"
        "github.com/apache/servicecomb-service-center/pkg/event"
-       "github.com/apache/servicecomb-service-center/pkg/gopool"
        "github.com/apache/servicecomb-service-center/pkg/log"
-       simple "github.com/apache/servicecomb-service-center/pkg/time"
-       pb "github.com/go-chassis/cari/discovery"
-)
-
-const (
-       AddJobTimeout  = 1 * time.Second
-       EventQueueSize = 5000
+       "github.com/apache/servicecomb-service-center/server/metrics"
 )
 
-var INSTANCE = event.RegisterType("INSTANCE", EventQueueSize)
-
-// 状态变化推送
-type InstanceEvent struct {
-       event.Event
-       Revision int64
-       Response *pb.WatchInstanceResponse
-}
+var errBusy = fmt.Errorf("too busy")
 
-type InstanceEventListWatcher struct {
+type InstanceSubscriber struct {
        event.Subscriber
-       Job          chan *InstanceEvent
-       ListRevision int64
-       ListFunc     func() (results []*pb.WatchInstanceResponse, rev int64)
-       listCh       chan struct{}
+       Job chan *InstanceEvent
 }
 
-func (w *InstanceEventListWatcher) SetError(err error) {
+func (w *InstanceSubscriber) SetError(err error) {
        w.Subscriber.SetError(err)
        // 触发清理job
        e := w.Bus().Fire(event.NewUnhealthyEvent(w))
@@ -59,108 +40,64 @@ func (w *InstanceEventListWatcher) SetError(err error) {
        }
 }
 
-func (w *InstanceEventListWatcher) OnAccept() {
+func (w *InstanceSubscriber) OnAccept() {
        if w.Err() != nil {
                return
        }
        log.Debugf("accepted by event service, %s watcher %s %s", w.Type(), 
w.Group(), w.Subject())
-       gopool.Go(w.listAndPublishJobs)
-}
-
-func (w *InstanceEventListWatcher) listAndPublishJobs(_ context.Context) {
-       defer close(w.listCh)
-       if w.ListFunc == nil {
-               return
-       }
-       results, rev := w.ListFunc()
-       w.ListRevision = rev
-       for _, response := range results {
-               w.sendMessage(NewInstanceEvent(w.Group(), w.Subject(), 
w.ListRevision, response))
-       }
 }
 
 //被通知
-func (w *InstanceEventListWatcher) OnMessage(job event.Event) {
+func (w *InstanceSubscriber) OnMessage(evt event.Event) {
        if w.Err() != nil {
                return
        }
 
-       wJob, ok := job.(*InstanceEvent)
+       wJob, ok := evt.(*InstanceEvent)
        if !ok {
                return
        }
-
-       select {
-       case <-w.listCh:
-       default:
-               timer := time.NewTimer(w.Timeout())
-               select {
-               case <-w.listCh:
-                       timer.Stop()
-               case <-timer.C:
-                       log.Errorf(nil,
-                               "the %s listwatcher %s %s is not ready[over 
%s], send the event %v",
-                               w.Type(), w.Group(), w.Subject(), w.Timeout(), 
job)
-               }
-       }
-
-       // the negative revision is specially for mongo scene,should be removed 
after mongo support revison.
-       if wJob.Revision >= 0 && wJob.Revision <= w.ListRevision {
-               log.Warnf("unexpected event %s job is coming in, watcher %s %s, 
job is %v, current revision is %v",
-                       w.Type(), w.Group(), w.Subject(), job, w.ListRevision)
-               return
-       }
        w.sendMessage(wJob)
 }
 
-func (w *InstanceEventListWatcher) sendMessage(job *InstanceEvent) {
+func (w *InstanceSubscriber) sendMessage(evt *InstanceEvent) {
        defer log.Recover()
+
+       metrics.ReportPendingCompleted(evt)
+
        select {
-       case w.Job <- job:
+       case w.Job <- evt:
        default:
-               timer := time.NewTimer(w.Timeout())
-               select {
-               case w.Job <- job:
-                       timer.Stop()
-               case <-timer.C:
-                       log.Errorf(nil,
-                               "the %s watcher %s %s event queue is full[over 
%s], drop the event %v",
-                               w.Type(), w.Group(), w.Subject(), w.Timeout(), 
job)
-               }
+               log.Errorf(nil, "the %s watcher %s %s event queue is full, drop 
the blocked events",
+                       w.Type(), w.Group(), w.Subject())
+               w.cleanup()
+               w.Job <- evt
        }
 }
 
-func (w *InstanceEventListWatcher) Timeout() time.Duration {
-       return AddJobTimeout
-}
-
-func (w *InstanceEventListWatcher) Close() {
-       close(w.Job)
-}
-
-func NewInstanceEvent(serviceID, domainProject string, rev int64, response 
*pb.WatchInstanceResponse) *InstanceEvent {
-       return &InstanceEvent{
-               Event:    event.NewEvent(INSTANCE, domainProject, serviceID),
-               Revision: rev,
-               Response: response,
+func (w *InstanceSubscriber) cleanup() {
+       for {
+               select {
+               case evt, ok := <-w.Job:
+                       if !ok {
+                               return
+                       }
+                       metrics.ReportPublishCompleted(evt, errBusy)
+               default:
+                       return
+               }
        }
 }
 
-func NewInstanceEventWithTime(serviceID, domainProject string, rev int64, 
createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
-       return &InstanceEvent{
-               Event:    event.NewEventWithTime(INSTANCE, domainProject, 
serviceID, createAt),
-               Revision: rev,
-               Response: response,
-       }
+func (w *InstanceSubscriber) Close() {
+       w.cleanup()
+       close(w.Job)
 }
 
-func NewInstanceEventListWatcher(serviceID, domainProject string,
-       listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) 
*InstanceEventListWatcher {
-       watcher := &InstanceEventListWatcher{
+func NewInstanceSubscriber(serviceID, domainProject string) 
*InstanceSubscriber {
+       watcher := &InstanceSubscriber{
                Subscriber: event.NewSubscriber(INSTANCE, domainProject, 
serviceID),
                Job:        make(chan *InstanceEvent, INSTANCE.QueueSize()),
-               ListFunc:   listFunc,
-               listCh:     make(chan struct{}),
        }
        return watcher
 }
diff --git a/server/metrics/connection.go b/server/metrics/connection.go
index aea7c97..c012980 100644
--- a/server/metrics/connection.go
+++ b/server/metrics/connection.go
@@ -44,6 +44,23 @@ var (
                        Objectives: metrics.Pxx,
                }, []string{"instance", "source", "status"})
 
+       pendingGauge = helper.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Namespace: metrics.FamilyName,
+                       Subsystem: "notify",
+                       Name:      "pending_total",
+                       Help:      "Counter of pending instance events",
+               }, []string{"instance", "source"})
+
+       pendingLatency = helper.NewSummaryVec(
+               prometheus.SummaryOpts{
+                       Namespace:  metrics.FamilyName,
+                       Subsystem:  "notify",
+                       Name:       "pending_durations_microseconds",
+                       Help:       "Latency of pending instance events",
+                       Objectives: metrics.Pxx,
+               }, []string{"instance", "source"})
+
        subscriberGauge = helper.NewGaugeVec(
                prometheus.GaugeOpts{
                        Namespace: metrics.FamilyName,
@@ -62,10 +79,17 @@ func ReportPublishCompleted(evt event.Event, err error) {
        }
        notifyLatency.WithLabelValues(instance, evt.Type().String(), 
status).Observe(elapsed)
        notifyCounter.WithLabelValues(instance, evt.Type().String(), 
status).Inc()
+       pendingGauge.WithLabelValues(instance, evt.Type().String()).Dec()
 }
 
-func ReportSubscriber(domain, scheme string, n float64) {
+func ReportPendingCompleted(evt event.Event) {
        instance := metrics.InstanceName()
+       elapsed := float64(time.Since(evt.CreateAt()).Nanoseconds()) / 
float64(time.Microsecond)
+       pendingLatency.WithLabelValues(instance, 
evt.Type().String()).Observe(elapsed)
+       pendingGauge.WithLabelValues(instance, evt.Type().String()).Inc()
+}
 
+func ReportSubscriber(domain, scheme string, n float64) {
+       instance := metrics.InstanceName()
        subscriberGauge.WithLabelValues(instance, domain, scheme).Add(n)
 }
diff --git a/server/rest/controller/v3/instance_watcher.go 
b/server/rest/controller/v3/instance_watcher.go
index 7395c4c..6d0690b 100644
--- a/server/rest/controller/v3/instance_watcher.go
+++ b/server/rest/controller/v3/instance_watcher.go
@@ -28,6 +28,5 @@ type WatchService struct {
 func (this *WatchService) URLPatterns() []rest.Route {
        return []rest.Route{
                {rest.HTTPMethodGet, 
"/registry/v3/microservices/:serviceId/watcher", this.Watch},
-               {rest.HTTPMethodGet, 
"/registry/v3/microservices/:serviceId/listwatcher", this.ListAndWatch},
        }
 }
diff --git a/server/rest/controller/v4/instance_watcher.go 
b/server/rest/controller/v4/instance_watcher.go
index e6de6f3..9396ee5 100644
--- a/server/rest/controller/v4/instance_watcher.go
+++ b/server/rest/controller/v4/instance_watcher.go
@@ -34,7 +34,6 @@ type WatchService struct {
 func (s *WatchService) URLPatterns() []rest.Route {
        return []rest.Route{
                {Method: rest.HTTPMethodGet, Path: 
"/v4/:project/registry/microservices/:serviceId/watcher", Func: s.Watch},
-               {Method: rest.HTTPMethodGet, Path: 
"/v4/:project/registry/microservices/:serviceId/listwatcher", Func: 
s.ListAndWatch},
        }
 }
 
@@ -63,16 +62,3 @@ func (s *WatchService) Watch(w http.ResponseWriter, r 
*http.Request) {
                SelfServiceId: r.URL.Query().Get(":serviceId"),
        }, conn)
 }
-
-func (s *WatchService) ListAndWatch(w http.ResponseWriter, r *http.Request) {
-       conn, err := upgrade(w, r)
-       if err != nil {
-               return
-       }
-       defer conn.Close()
-
-       r.Method = "WATCHLIST"
-       core.InstanceAPI.WebSocketListAndWatch(r.Context(), 
&pb.WatchInstanceRequest{
-               SelfServiceId: r.URL.Query().Get(":serviceId"),
-       }, conn)
-}
diff --git a/server/service/watch.go b/server/service/watch.go
index b3c00df..8813b0f 100644
--- a/server/service/watch.go
+++ b/server/service/watch.go
@@ -54,7 +54,7 @@ func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, 
stream proto.Servic
                return err
        }
 
-       return grpc.ListAndWatch(stream.Context(), in.SelfServiceId, nil, 
stream)
+       return grpc.Watch(stream.Context(), in.SelfServiceId, stream)
 }
 
 func (s *InstanceService) WebSocketWatch(ctx context.Context, in 
*pb.WatchInstanceRequest, conn *websocket.Conn) {
@@ -63,18 +63,7 @@ func (s *InstanceService) WebSocketWatch(ctx 
context.Context, in *pb.WatchInstan
                ws.SendEstablishError(conn, err)
                return
        }
-       ws.ListAndWatch(ctx, in.SelfServiceId, nil, conn)
-}
-
-func (s *InstanceService) WebSocketListAndWatch(ctx context.Context, in 
*pb.WatchInstanceRequest, conn *websocket.Conn) {
-       log.Infof("new a web socket list and watch with service[%s]", 
in.SelfServiceId)
-       if err := s.WatchPreOpera(ctx, in); err != nil {
-               ws.SendEstablishError(conn, err)
-               return
-       }
-       ws.ListAndWatch(ctx, in.SelfServiceId, func() 
([]*pb.WatchInstanceResponse, int64) {
-               return s.QueryAllProvidersInstances(ctx, in)
-       }, conn)
+       ws.Watch(ctx, in.SelfServiceId, conn)
 }
 
 func (s *InstanceService) QueryAllProvidersInstances(ctx context.Context, in 
*pb.WatchInstanceRequest) ([]*pb.WatchInstanceResponse, int64) {
diff --git a/server/service/watch_test.go b/server/service/watch_test.go
index b663cc8..a058399 100644
--- a/server/service/watch_test.go
+++ b/server/service/watch_test.go
@@ -46,13 +46,6 @@ func TestInstanceService_WebSocketWatch(t *testing.T) {
        instanceResource.WebSocketWatch(context.Background(), 
&pb.WatchInstanceRequest{}, nil)
 }
 
-func TestInstanceService_WebSocketListAndWatch(t *testing.T) {
-       defer func() {
-               recover()
-       }()
-       instanceResource.WebSocketListAndWatch(context.Background(), 
&pb.WatchInstanceRequest{}, nil)
-}
-
 var _ = Describe("'Instance' service", func() {
        Describe("execute 'watch' operartion", func() {
                var (

Reply via email to