This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch v1.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/v1.x by this push:
     new b5e162d  SCB-2176 Refactor websocket (#986) (#989)
b5e162d is described below

commit b5e162da02db641385ce45ee278842ab313d10ea
Author: little-cui <[email protected]>
AuthorDate: Fri May 21 08:36:18 2021 +0800

    SCB-2176 Refactor websocket (#986) (#989)
    
    * SCB-2176 Refactor websocket (#986)
    
    * SCB-2176 Refactor event bus
    
    * SCB-2176 Fix: UT failures
    
    * SCB-2176 Refactor websocket (#980)
    
    * SCB-2176 Refactor websocket
    
    * SCB-2176 Add UTs
    
    * SCB-2176 Fix: UT failures
    
    * SCB-2176 Fix: websocket health check failed
    
    * SCB-2176 Add benchmark for 10K connection
    
    * SCB-2176 Add benchmark for 10K connection
    
    * SCB-2176 Add event metrics
    
    (cherry picked from commit cc0f684449f6606c45c86d53508428a50eb32b84)
    
    * SCB-2176 Fix: export metrics APIs
    
    * SCB-2176 Resolve comments
---
 examples/infrastructures/docker/README.md       |   2 +-
 go.mod                                          |   1 +
 go.sum                                          |   3 +
 integration/health-metrics-grafana.json         | 174 ++++++++++++++++--------
 integration/instances_test.go                   | 127 ++++++++++++++++-
 pkg/util/context.go                             |  14 +-
 server/connection/metrics.go                    |  27 +++-
 server/connection/ws/websocket.go               |  24 +---
 server/connection/ws/websocket_test.go          |   2 +-
 server/event/instance_subscriber.go             |  44 +++---
 server/metric/prometheus/metrics.go             |   5 +
 server/metric/prometheus/reporter.go            |   5 +-
 server/plugin/discovery/etcd/cacher_kv.go       |   1 +
 server/plugin/discovery/metrics.go              |  34 ++++-
 server/plugin/discovery/servicecenter/common.go |  30 ++--
 server/service/event/instance_event_handler.go  |  47 ++++---
 server/service/util/microservice_util.go        |   9 --
 17 files changed, 404 insertions(+), 145 deletions(-)

diff --git a/examples/infrastructures/docker/README.md 
b/examples/infrastructures/docker/README.md
index 1955c64..90c3b72 100644
--- a/examples/infrastructures/docker/README.md
+++ b/examples/infrastructures/docker/README.md
@@ -5,7 +5,7 @@ A simple demo to deploy ServiceCenter in docker environment.
 ## Quick Start
 
 ```bash
-cd $PROJECT_ROOT/integration/docker
+cd examples/infrastructures/docker
 docker-compose up
 ```
 This will start up ServiceCenter listening on `:30100` for handling requests 
and Dashboard listening on `:30103`.
diff --git a/go.mod b/go.mod
index 7d7edd3..7a194dc 100644
--- a/go.mod
+++ b/go.mod
@@ -10,6 +10,7 @@ require (
        github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea // v4
        github.com/dgrijalva/jwt-go v3.2.0+incompatible
        github.com/elithrar/simple-scrypt v1.3.0
+       github.com/go-chassis/cari v0.3.0
        github.com/go-chassis/foundation v0.3.0
        github.com/go-chassis/go-archaius v1.3.2
        github.com/go-chassis/go-chassis v0.0.0-20200826064053-d90be848aa10
diff --git a/go.sum b/go.sum
index a04e695..4b9d25b 100644
--- a/go.sum
+++ b/go.sum
@@ -75,6 +75,7 @@ github.com/davecgh/go-spew 
v0.0.0-20151105211317-5215b55f46b2/go.mod h1:J7Y8YcW2
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/deckarep/golang-set v1.7.1/go.mod 
h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ=
 github.com/dgrijalva/jwt-go v3.2.0+incompatible 
h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
 github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod 
h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
 github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96 
h1:cenwrSVm+Z7QLSV/BsnenAOcDXdX4cMv4wP0B/5QbPg=
@@ -109,6 +110,8 @@ github.com/gin-contrib/sse 
v0.0.0-20190301062529-5545eab6dad3 h1:t8FVkw33L+wilf2
 github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3/go.mod 
h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s=
 github.com/gin-gonic/gin v1.4.0 h1:3tMoCCfM7ppqsR0ptz/wi1impNpT7/9wQtMZ8lr1mCQ=
 github.com/gin-gonic/gin v1.4.0/go.mod 
h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/3rZdM=
+github.com/go-chassis/cari v0.3.0 
h1:ysEX1t9dBObshebFKca3zhrWFqyPvcIZo2r66IyJjuk=
+github.com/go-chassis/cari v0.3.0/go.mod 
h1:Ie2lW11Y5ZFClY9z7bhAwK6BoNxqGSf3fYGs4mPFs74=
 github.com/go-chassis/foundation v0.1.1-0.20191113114104-2b05871e9ec4/go.mod 
h1:21/ajGtgJlWTCeM0TxGJdRhO8bJkKirWyV8Stlh6g6c=
 github.com/go-chassis/foundation v0.1.1-0.20200825060850-b16bf420f7b3/go.mod 
h1:21/ajGtgJlWTCeM0TxGJdRhO8bJkKirWyV8Stlh6g6c=
 github.com/go-chassis/foundation v0.3.0 
h1:jG4BIrK8fXD9jbTtJ5rOLGQZ1pQI/mLnDuVJzToCtos=
diff --git a/integration/health-metrics-grafana.json 
b/integration/health-metrics-grafana.json
index f730b2c..84f963b 100644
--- a/integration/health-metrics-grafana.json
+++ b/integration/health-metrics-grafana.json
@@ -1,18 +1,3 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License
-
 {
   "__inputs": [
     {
@@ -2183,7 +2168,7 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": 
"sum(irate(service_center_db_backend_event_total{job=\"service-center\"}[1m])) 
by (instance,prefix)",
+          "expr": 
"sum(irate(service_center_db_dispatch_event_total{job=\"service-center\"}[1m])) 
by (instance,prefix)",
           "format": "time_series",
           "intervalFactor": 1,
           "legendFormat": "{{instance}}> {{prefix}}",
@@ -2193,7 +2178,7 @@
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Publish Events",
+      "title": "Produce Events",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2277,7 +2262,7 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": 
"max(avg_over_time(service_center_db_backend_event_durations_microseconds{job=\"service-center\"}[1m]))
 by (instance, prefix)",
+          "expr": 
"max(avg_over_time(service_center_db_dispatch_event_durations_microseconds{job=\"service-center\"}[1m]))
 by (instance, prefix)",
           "format": "time_series",
           "intervalFactor": 1,
           "legendFormat": "{{instance}}> {{prefix}}",
@@ -2287,7 +2272,7 @@
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Publish Events Latency",
+      "title": "Produce Events Latency",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2381,7 +2366,7 @@
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Subscribe Events",
+      "title": "Post Events",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2475,7 +2460,7 @@
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Subscribe Events Latency",
+      "title": "Post Events Latency",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2526,8 +2511,91 @@
         "x": 0,
         "y": 40
       },
+      "id": 48,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": false,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": 
"sum(service_center_notify_pending_total{job=\"service-center\"}) by 
(instance,source)",
+          "format": "time_series",
+          "intervalFactor": 1,
+          "refId": "A"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Pending Events",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_LOCAL}",
+      "fill": 1,
+      "gridPos": {
+        "h": 6,
+        "w": 6,
+        "x": 6,
+        "y": 40
+      },
       "height": "",
-      "id": 37,
+      "id": 43,
       "legend": {
         "alignAsTable": false,
         "avg": false,
@@ -2558,18 +2626,17 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": 
"sum(irate(service_center_db_backend_operation_total{job=\"service-center\"}[1m]))
 by (instance,operation)",
+          "expr": 
"service_center_notify_subscriber_total{job=\"service-center\"}",
           "format": "time_series",
-          "instant": false,
           "intervalFactor": 1,
-          "legendFormat": "{{instance}}> {{operation}}",
-          "refId": "A"
+          "legendFormat": "{{instance}}> {{domain}} {{scheme}}",
+          "refId": "B"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Backend Operations",
+      "title": "Subscribers",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2587,7 +2654,7 @@
       "yaxes": [
         {
           "decimals": 0,
-          "format": "ops",
+          "format": "none",
           "label": null,
           "logBase": 1,
           "max": null,
@@ -2618,25 +2685,25 @@
       "gridPos": {
         "h": 6,
         "w": 6,
-        "x": 6,
+        "x": 12,
         "y": 40
       },
       "height": "",
-      "id": 38,
+      "id": 37,
       "legend": {
         "alignAsTable": false,
-        "avg": true,
+        "avg": false,
         "current": false,
         "hideEmpty": true,
         "hideZero": true,
-        "max": true,
-        "min": true,
-        "rightSide": true,
+        "max": false,
+        "min": false,
+        "rightSide": false,
         "show": false,
         "sort": "avg",
         "sortDesc": true,
         "total": false,
-        "values": true
+        "values": false
       },
       "lines": true,
       "linewidth": 1,
@@ -2653,17 +2720,18 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": 
"max(avg_over_time(service_center_db_backend_operation_durations_microseconds{job=\"service-center\"}[1m]))
 by (instance,operation)",
+          "expr": 
"sum(irate(service_center_db_backend_operation_total{job=\"service-center\"}[1m]))
 by (instance,operation)",
           "format": "time_series",
+          "instant": false,
           "intervalFactor": 1,
           "legendFormat": "{{instance}}> {{operation}}",
-          "refId": "B"
+          "refId": "A"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Backend Operations Latency",
+      "title": "Backend Operations",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2680,7 +2748,8 @@
       },
       "yaxes": [
         {
-          "format": "µs",
+          "decimals": 0,
+          "format": "ops",
           "label": null,
           "logBase": 1,
           "max": null,
@@ -2711,25 +2780,25 @@
       "gridPos": {
         "h": 6,
         "w": 6,
-        "x": 12,
+        "x": 18,
         "y": 40
       },
       "height": "",
-      "id": 43,
+      "id": 38,
       "legend": {
         "alignAsTable": false,
-        "avg": false,
+        "avg": true,
         "current": false,
         "hideEmpty": true,
         "hideZero": true,
-        "max": false,
-        "min": false,
-        "rightSide": false,
+        "max": true,
+        "min": true,
+        "rightSide": true,
         "show": false,
         "sort": "avg",
         "sortDesc": true,
         "total": false,
-        "values": false
+        "values": true
       },
       "lines": true,
       "linewidth": 1,
@@ -2746,17 +2815,17 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": 
"service_center_notify_subscriber_total{job=\"service-center\"}",
+          "expr": 
"max(avg_over_time(service_center_db_backend_operation_durations_microseconds{job=\"service-center\"}[1m]))
 by (instance,operation)",
           "format": "time_series",
           "intervalFactor": 1,
-          "legendFormat": "{{instance}}> {{domain}} {{scheme}}",
+          "legendFormat": "{{instance}}> {{operation}}",
           "refId": "B"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Subscribers",
+      "title": "Backend Operations Latency",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2773,8 +2842,7 @@
       },
       "yaxes": [
         {
-          "decimals": 0,
-          "format": "none",
+          "format": "µs",
           "label": null,
           "logBase": 1,
           "max": null,
@@ -3434,7 +3502,7 @@
     "list": []
   },
   "time": {
-    "from": "now-5m",
+    "from": "now-30m",
     "to": "now"
   },
   "timepicker": {
@@ -3465,5 +3533,5 @@
   "timezone": "",
   "title": "ServiceCenter",
   "uid": "Zg6NoHGiz",
-  "version": 12
+  "version": 17
 }
\ No newline at end of file
diff --git a/integration/instances_test.go b/integration/instances_test.go
index b84fbc4..fcc9108 100644
--- a/integration/instances_test.go
+++ b/integration/instances_test.go
@@ -18,14 +18,19 @@ package integrationtest_test
 
 import (
        "encoding/json"
-       . "github.com/onsi/ginkgo"
-       . "github.com/onsi/gomega"
-       "github.com/widuu/gojson"
+       "fmt"
+       "github.com/go-chassis/cari/discovery"
+       "github.com/gorilla/websocket"
+       "github.com/stretchr/testify/assert"
        "net/http"
        "strings"
+       "sync"
 
        "bytes"
        . "github.com/apache/servicecomb-service-center/integration"
+       . "github.com/onsi/ginkgo"
+       . "github.com/onsi/gomega"
+       "github.com/widuu/gojson"
        "io/ioutil"
        "math/rand"
        "strconv"
@@ -691,3 +696,119 @@ func BenchmarkRegisterMicroServiceInstance(b *testing.B) {
                Expect(resp.StatusCode).To(Equal(http.StatusOK))
        }
 }
+
+func BenchmarkInstanceWatch(t *testing.B) {
+       scclient = insecurityConnection
+       var serviceId, instanceId string
+
+       t.Run("prepare data", func(t *testing.B) {
+               // service
+               serviceName := "testInstance" + strconv.Itoa(rand.Int())
+               servicemap := map[string]interface{}{
+                       "serviceName": serviceName,
+                       "appId":       "testApp",
+                       "version":     "1.0",
+               }
+               bodyParams := map[string]interface{}{
+                       "service": servicemap,
+               }
+               body, _ := json.Marshal(bodyParams)
+               bodyBuf := bytes.NewReader(body)
+               req, _ := http.NewRequest(POST, SCURL+REGISTERMICROSERVICE, 
bodyBuf)
+               req.Header.Set("X-Domain-Name", "default")
+               resp, err := scclient.Do(req)
+               assert.NoError(t, err)
+               assert.Equal(t, http.StatusOK, resp.StatusCode)
+               respbody, _ := ioutil.ReadAll(resp.Body)
+               serviceId = 
gojson.Json(string(respbody)).Get("serviceId").Tostring()
+               resp.Body.Close()
+
+               // instance
+               healthcheck := map[string]interface{}{
+                       "mode":     "push",
+                       "interval": 30000,
+                       "times":    20000,
+               }
+               instance := map[string]interface{}{
+                       "hostName":    "cse",
+                       "healthCheck": healthcheck,
+               }
+               bodyParams = map[string]interface{}{
+                       "instance": instance,
+               }
+               body, _ = json.Marshal(bodyParams)
+               bodyBuf = bytes.NewReader(body)
+               req, _ = http.NewRequest(POST, 
SCURL+strings.Replace(REGISTERINSTANCE, ":serviceId", serviceId, 1), bodyBuf)
+               req.Header.Set("X-Domain-Name", "default")
+               resp, err = scclient.Do(req)
+               assert.NoError(t, err)
+               respbody, _ = ioutil.ReadAll(resp.Body)
+               instanceId = 
gojson.Json(string(respbody)).Get("instanceId").Tostring()
+               resp.Body.Close()
+
+               req, _ = http.NewRequest(GET, 
SCURL+FINDINSTANCE+"?appId=testApp&serviceName="+serviceName+"&version=1.0", 
nil)
+               req.Header.Set("X-Domain-Name", "default")
+               req.Header.Set("X-ConsumerId", serviceId)
+               resp, err = scclient.Do(req)
+               assert.NoError(t, err)
+               resp.Body.Close()
+
+               assert.Equal(t, http.StatusOK, resp.StatusCode)
+       })
+
+       t.Run("test 10K connection", func(t *testing.B) {
+
+               const N, E = 2500, 2500
+               var okWg sync.WaitGroup
+               okWg.Add(N)
+
+               t.Run("new 10K connection", func(t *testing.B) {
+                       url := strings.ReplaceAll(strings.ReplaceAll(SCURL, 
"http://";, "ws://")+INSTANCEWATCHER, ":serviceId", serviceId)
+                       for i := 0; i < N; i++ {
+                               go func() {
+                                       conn, _, err := 
websocket.DefaultDialer.Dial(url, nil)
+                                       assert.NoError(t, err)
+                                       for {
+                                               _ = 
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
+                                               _, data, err := 
conn.ReadMessage()
+                                               if err != nil {
+                                                       okWg.Done()
+                                                       return
+                                               }
+
+                                               var response 
discovery.WatchInstanceResponse
+                                               _ = json.Unmarshal(data, 
&response)
+                                               instance := response.Instance
+                                               timestamp, _ := 
strconv.ParseInt(instance.ModTimestamp, 10, 64)
+                                               sub := 
time.Now().Sub(time.Unix(timestamp, 0))
+                                               
fmt.Println(instance.Properties["tag"], sub)
+                                       }
+                               }()
+                       }
+                       <-time.After(10 * time.Second)
+               })
+
+               t.Run("fire 10K event", func(t *testing.B) {
+                       for i := 0; i < E; i++ {
+                               propertiesInstance := map[string]interface{}{
+                                       "tag": strconv.Itoa(i),
+                               }
+                               bodyParams := map[string]interface{}{
+                                       "properties": propertiesInstance,
+                               }
+                               url := strings.Replace(UPDATEINSTANCEMETADATA, 
":serviceId", serviceId, 1)
+                               url = strings.Replace(url, ":instanceId", 
instanceId, 1)
+                               body, _ := json.Marshal(bodyParams)
+                               bodyBuf := bytes.NewReader(body)
+                               req, _ := http.NewRequest(UPDATE, SCURL+url, 
bodyBuf)
+                               req.Header.Set("X-Domain-Name", "default")
+                               resp, _ := scclient.Do(req)
+                               resp.Body.Close()
+                       }
+               })
+
+               t.Run("wait", func(t *testing.B) {
+                       okWg.Wait()
+               })
+       })
+}
diff --git a/pkg/util/context.go b/pkg/util/context.go
index d1d4ceb..2c97d3d 100644
--- a/pkg/util/context.go
+++ b/pkg/util/context.go
@@ -20,6 +20,7 @@ package util
 import (
        "context"
        "net/http"
+       "strings"
        "time"
 )
 
@@ -28,6 +29,7 @@ const (
        CtxProject       = "project"
        CtxTargetDomain  = "target-domain"
        CtxTargetProject = "target-project"
+       SPLIT            = "/"
 )
 
 type StringContext struct {
@@ -119,11 +121,11 @@ func SetRequestContext(r *http.Request, key string, val 
interface{}) *http.Reque
 }
 
 func ParseDomainProject(ctx context.Context) string {
-       return ParseDomain(ctx) + "/" + ParseProject(ctx)
+       return ParseDomain(ctx) + SPLIT + ParseProject(ctx)
 }
 
 func ParseTargetDomainProject(ctx context.Context) string {
-       return ParseTargetDomain(ctx) + "/" + ParseTargetProject(ctx)
+       return ParseTargetDomain(ctx) + SPLIT + ParseTargetProject(ctx)
 }
 
 func ParseDomain(ctx context.Context) string {
@@ -178,6 +180,14 @@ func SetDomainProject(ctx context.Context, domain string, 
project string) contex
        return SetProject(SetDomain(ctx, domain), project)
 }
 
+func SetDomainProjectString(ctx context.Context, domainProject string) 
context.Context {
+       arr := strings.Split(domainProject, SPLIT)
+       if len(arr) != 2 {
+               return ctx
+       }
+       return SetProject(SetDomain(ctx, arr[0]), arr[1])
+}
+
 func SetTargetDomainProject(ctx context.Context, domain string, project 
string) context.Context {
        return SetTargetProject(SetTargetDomain(ctx, domain), project)
 }
diff --git a/server/connection/metrics.go b/server/connection/metrics.go
index 76aa41e..a83b229 100644
--- a/server/connection/metrics.go
+++ b/server/connection/metrics.go
@@ -47,6 +47,23 @@ var (
                        Objectives: metric.Pxx,
                }, []string{"instance", "source", "status"})
 
+       pendingGauge = prometheus.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Namespace: metric.FamilyName,
+                       Subsystem: "notify",
+                       Name:      "pending_total",
+                       Help:      "Counter of pending instance events",
+               }, []string{"instance", "source"})
+
+       pendingLatency = prometheus.NewSummaryVec(
+               prometheus.SummaryOpts{
+                       Namespace:  metric.FamilyName,
+                       Subsystem:  "notify",
+                       Name:       "pending_durations_microseconds",
+                       Help:       "Latency of pending instance events",
+                       Objectives: metric.Pxx,
+               }, []string{"instance", "source"})
+
        subscriberGauge = prometheus.NewGaugeVec(
                prometheus.GaugeOpts{
                        Namespace: metric.FamilyName,
@@ -57,7 +74,7 @@ var (
 )
 
 func init() {
-       prometheus.MustRegister(notifyCounter, notifyLatency, subscriberGauge)
+       prometheus.MustRegister(notifyCounter, notifyLatency, subscriberGauge, 
pendingLatency, pendingGauge)
 }
 
 func ReportPublishCompleted(evt event.Event, err error) {
@@ -69,6 +86,14 @@ func ReportPublishCompleted(evt event.Event, err error) {
        }
        notifyLatency.WithLabelValues(instance, evt.Type().String(), 
status).Observe(elapsed)
        notifyCounter.WithLabelValues(instance, evt.Type().String(), 
status).Inc()
+       pendingGauge.WithLabelValues(instance, evt.Type().String()).Dec()
+}
+
+func ReportPendingCompleted(evt event.Event) {
+       instance := metric.InstanceName()
+       elapsed := float64(time.Since(evt.CreateAt()).Nanoseconds()) / 
float64(time.Microsecond)
+       pendingLatency.WithLabelValues(instance, 
evt.Type().String()).Observe(elapsed)
+       pendingGauge.WithLabelValues(instance, evt.Type().String()).Inc()
 }
 
 func ReportSubscriber(domain, scheme string, n float64) {
diff --git a/server/connection/ws/websocket.go 
b/server/connection/ws/websocket.go
index 24a7e2b..380c9db 100644
--- a/server/connection/ws/websocket.go
+++ b/server/connection/ws/websocket.go
@@ -19,16 +19,19 @@ package ws
 
 import (
        "context"
-       "fmt"
+       "errors"
+       "time"
+
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/server/connection"
        serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
        "github.com/gorilla/websocket"
-       "time"
 )
 
 const Websocket = "Websocket"
 
+var errServiceNotExist = errors.New("service does not exist")
+
 type WebSocket struct {
        Options
        Conn          *websocket.Conn
@@ -146,13 +149,11 @@ func (wh *WebSocket) CheckHealth(ctx context.Context) 
error {
        }
 
        if !serviceUtil.ServiceExist(ctx, wh.DomainProject, wh.ConsumerID) {
-               return fmt.Errorf("Service does not exist.")
+               return errServiceNotExist
        }
 
        remoteAddr := wh.Conn.RemoteAddr().String()
        if err := wh.WritePingPong(websocket.PingMessage); err != nil {
-               log.Errorf(err, "send 'Ping' message to subscriber[%s] failed, 
consumer: %s",
-                       remoteAddr, wh.ConsumerID)
                return err
        }
 
@@ -162,18 +163,7 @@ func (wh *WebSocket) CheckHealth(ctx context.Context) 
error {
 }
 
 func (wh *WebSocket) WritePingPong(messageType int) error {
-       err := wh.Conn.WriteControl(messageType, []byte{}, 
time.Now().Add(wh.SendTimeout))
-       if err != nil {
-               messageTypeName := "Ping"
-               if messageType == websocket.PongMessage {
-                       messageTypeName = "Pong"
-               }
-               log.Errorf(err, "fail to send '%s' to subscriber[%s], consumer: 
%s",
-                       messageTypeName, wh.Conn.RemoteAddr(), wh.ConsumerID)
-               //wh.subscriber.SetError(err)
-               return err
-       }
-       return nil
+       return wh.Conn.WriteControl(messageType, []byte{}, 
time.Now().Add(wh.SendTimeout))
 }
 
 func (wh *WebSocket) WriteTextMessage(message []byte) error {
diff --git a/server/connection/ws/websocket_test.go 
b/server/connection/ws/websocket_test.go
index ca40bd6..7518b5a 100644
--- a/server/connection/ws/websocket_test.go
+++ b/server/connection/ws/websocket_test.go
@@ -159,6 +159,6 @@ func TestWebSocket_CheckHealth(t *testing.T) {
        })
        t.Run("should return err when consumer not exist", func(t *testing.T) {
                ws := wss.NewWebSocket("", "", mock.ServerConn)
-               assert.Equal(t, "Service does not exist.", 
ws.CheckHealth(context.Background()).Error())
+               assert.Equal(t, "service does not exist", 
ws.CheckHealth(context.Background()).Error())
        })
 }
diff --git a/server/event/instance_subscriber.go 
b/server/event/instance_subscriber.go
index cb1895f..f270fcb 100644
--- a/server/event/instance_subscriber.go
+++ b/server/event/instance_subscriber.go
@@ -18,12 +18,13 @@
 package event
 
 import (
+       "errors"
        "github.com/apache/servicecomb-service-center/pkg/event"
        "github.com/apache/servicecomb-service-center/pkg/log"
-       "time"
+       "github.com/apache/servicecomb-service-center/server/connection"
 )
 
-const AddJobTimeout = 1 * time.Second
+var errBusy = errors.New("too busy")
 
 type InstanceSubscriber struct {
        event.Subscriber
@@ -47,40 +48,49 @@ func (w *InstanceSubscriber) OnAccept() {
 }
 
 //被通知
-func (w *InstanceSubscriber) OnMessage(job event.Event) {
+func (w *InstanceSubscriber) OnMessage(evt event.Event) {
        if w.Err() != nil {
                return
        }
 
-       wJob, ok := job.(*InstanceEvent)
+       wJob, ok := evt.(*InstanceEvent)
        if !ok {
                return
        }
        w.sendMessage(wJob)
 }
 
-func (w *InstanceSubscriber) sendMessage(job *InstanceEvent) {
+func (w *InstanceSubscriber) sendMessage(evt *InstanceEvent) {
        defer log.Recover()
+
+       connection.ReportPendingCompleted(evt)
+
        select {
-       case w.Job <- job:
+       case w.Job <- evt:
        default:
-               timer := time.NewTimer(w.Timeout())
-               select {
-               case w.Job <- job:
-                       timer.Stop()
-               case <-timer.C:
-                       log.Errorf(nil,
-                               "the %s watcher %s %s event queue is full[over 
%s], drop the event %v",
-                               w.Type(), w.Group(), w.Subject(), w.Timeout(), 
job)
-               }
+               log.Errorf(nil, "the %s watcher %s %s event queue is full, drop 
the blocked events",
+                       w.Type(), w.Group(), w.Subject())
+               w.cleanup()
+               w.Job <- evt
        }
 }
 
-func (w *InstanceSubscriber) Timeout() time.Duration {
-       return AddJobTimeout
+func (w *InstanceSubscriber) cleanup() {
+       for {
+               select {
+               case evt, ok := <-w.Job:
+                       if !ok {
+                               return
+                       }
+                       connection.ReportPublishCompleted(evt, errBusy)
+               default:
+                       return
+               }
+       }
 }
 
 func (w *InstanceSubscriber) Close() {
+       w.cleanup()
        close(w.Job)
 }
 
diff --git a/server/metric/prometheus/metrics.go 
b/server/metric/prometheus/metrics.go
index 2c2c5c7..e456927 100644
--- a/server/metric/prometheus/metrics.go
+++ b/server/metric/prometheus/metrics.go
@@ -21,8 +21,10 @@ import (
        "github.com/apache/servicecomb-service-center/pkg/rest"
        "github.com/apache/servicecomb-service-center/pkg/util"
        "github.com/apache/servicecomb-service-center/server/metric"
+       api "github.com/apache/servicecomb-service-center/server/rest"
        "github.com/prometheus/client_golang/prometheus"
        "net/http"
+       "os"
        "strconv"
        "strings"
        "time"
@@ -65,6 +67,9 @@ var (
 
 func init() {
        prometheus.MustRegister(incomingRequests, successfulRequests, 
reqDurations, queryPerSeconds)
+       if "true" == os.Getenv("METRICS_ENABLE") {
+               api.RegisterServerHandler("/metrics", prometheus.Handler())
+       }
 }
 
 func ReportRequestCompleted(w http.ResponseWriter, r *http.Request, start 
time.Time) {
diff --git a/server/metric/prometheus/reporter.go 
b/server/metric/prometheus/reporter.go
index f089b90..32cc6df 100644
--- a/server/metric/prometheus/reporter.go
+++ b/server/metric/prometheus/reporter.go
@@ -18,7 +18,6 @@ package prometheus
 import (
        "github.com/apache/servicecomb-service-center/server/metric"
        dto "github.com/prometheus/client_model/go"
-       "os"
 )
 
 const (
@@ -65,9 +64,7 @@ func (r *APIReporter) toLabels(pairs []*dto.LabelPair) 
(labels []string) {
 }
 
 func init() {
-       if "true" == os.Getenv("METRICS_ENABLE") {
-               metric.RegisterReporter("rest", NewAPIReporter())
-       }
+       metric.RegisterReporter("rest", NewAPIReporter())
 }
 
 func NewAPIReporter() *APIReporter {
diff --git a/server/plugin/discovery/etcd/cacher_kv.go 
b/server/plugin/discovery/etcd/cacher_kv.go
index a61d1e3..5a9beea 100644
--- a/server/plugin/discovery/etcd/cacher_kv.go
+++ b/server/plugin/discovery/etcd/cacher_kv.go
@@ -451,6 +451,7 @@ func (c *KvCacher) notify(evts []discovery.KvEvent) {
        for _, evt := range evts {
                c.Cfg.OnEvent(evt)
        }
+       discovery.ReportDispatchEventCompleted(c.Cfg.Key, evts)
 }
 
 func (c *KvCacher) doParse(src *mvccpb.KeyValue) (kv *discovery.KeyValue) {
diff --git a/server/plugin/discovery/metrics.go 
b/server/plugin/discovery/metrics.go
index 4cd41a2..ce434ca 100644
--- a/server/plugin/discovery/metrics.go
+++ b/server/plugin/discovery/metrics.go
@@ -39,10 +39,27 @@ var (
                        Help:       "Latency of backend events processing",
                        Objectives: metric.Pxx,
                }, []string{"instance", "prefix"})
+
+       dispatchCounter = prometheus.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Namespace: metric.FamilyName,
+                       Subsystem: "db",
+                       Name:      "dispatch_event_total",
+                       Help:      "Counter of backend events dispatch",
+               }, []string{"instance", "prefix"})
+
+       dispatchLatency = prometheus.NewSummaryVec(
+               prometheus.SummaryOpts{
+                       Namespace:  metric.FamilyName,
+                       Subsystem:  "db",
+                       Name:       "dispatch_event_durations_microseconds",
+                       Help:       "Latency of backend events dispatch",
+                       Objectives: metric.Pxx,
+               }, []string{"instance", "prefix"})
 )
 
 func init() {
-       prometheus.MustRegister(eventsCounter, eventsLatency)
+       prometheus.MustRegister(eventsCounter, eventsLatency, dispatchCounter, 
dispatchLatency)
 }
 
 func ReportProcessEventCompleted(prefix string, evts []KvEvent) {
@@ -57,4 +74,19 @@ func ReportProcessEventCompleted(prefix string, evts 
[]KvEvent) {
                eventsLatency.WithLabelValues(instance, prefix).Observe(elapsed)
        }
        eventsCounter.WithLabelValues(instance, prefix).Add(l)
+       dispatchCounter.WithLabelValues(instance, prefix).Add(l)
+}
+
+func ReportDispatchEventCompleted(prefix string, evts []KvEvent) {
+       l := float64(len(evts))
+       if l == 0 {
+               return
+       }
+       instance := metric.InstanceName()
+       now := time.Now()
+       for _, evt := range evts {
+               elapsed := float64(now.Sub(evt.CreateAt.Local()).Nanoseconds()) 
/ float64(time.Microsecond)
+               dispatchLatency.WithLabelValues(instance, 
prefix).Observe(elapsed)
+       }
+       dispatchCounter.WithLabelValues(instance, prefix).Add(-l)
 }
diff --git a/server/plugin/discovery/servicecenter/common.go 
b/server/plugin/discovery/servicecenter/common.go
index d59957f..a107db1 100644
--- a/server/plugin/discovery/servicecenter/common.go
+++ b/server/plugin/discovery/servicecenter/common.go
@@ -1,17 +1,19 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package servicecenter
 
diff --git a/server/service/event/instance_event_handler.go 
b/server/service/event/instance_event_handler.go
index 0d6010d..46f9b3a 100644
--- a/server/service/event/instance_event_handler.go
+++ b/server/service/event/instance_event_handler.go
@@ -18,6 +18,9 @@ package event
 
 import (
        "context"
+       "fmt"
+       "strings"
+
        "github.com/apache/servicecomb-service-center/pkg/log"
        pb "github.com/apache/servicecomb-service-center/pkg/registry"
        "github.com/apache/servicecomb-service-center/pkg/util"
@@ -29,7 +32,6 @@ import (
        "github.com/apache/servicecomb-service-center/server/service/cache"
        "github.com/apache/servicecomb-service-center/server/service/metrics"
        serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
-       "strings"
 )
 
 const (
@@ -57,12 +59,15 @@ func (h *InstanceEventHandler) OnEvent(evt 
discovery.KvEvent) {
        domainName := domainProject[:idx]
        projectName := domainProject[idx+1:]
 
+       ctx := context.WithValue(context.WithValue(context.Background(),
+               util.CtxCacheOnly, "1"),
+               util.CtxGlobal, "1")
+
        var count float64 = increaseOne
-       switch action {
-       case pb.EVT_INIT:
+       if action == pb.EVT_INIT {
                metrics.ReportInstances(domainName, count)
-               ms := serviceUtil.GetServiceFromCache(domainProject, providerID)
-               if ms == nil {
+               ms, err := serviceUtil.GetService(ctx, domainProject, 
providerID)
+               if err != nil {
                        log.Warnf("caught [%s] instance[%s/%s] event, endpoints 
%v, get cached provider's file failed",
                                action, providerID, providerInstanceID, 
instance.Endpoints)
                        return
@@ -70,11 +75,10 @@ func (h *InstanceEventHandler) OnEvent(evt 
discovery.KvEvent) {
                frameworkName, frameworkVersion := getFramework(ms)
                metrics.ReportFramework(domainName, projectName, frameworkName, 
frameworkVersion, count)
                return
-       case pb.EVT_CREATE:
-               metrics.ReportInstances(domainName, count)
-       case pb.EVT_DELETE:
+       }
+
+       if action == pb.EVT_DELETE {
                count = decreaseOne
-               metrics.ReportInstances(domainName, count)
                if !apt.IsDefaultDomainProject(domainProject) {
                        projectName := domainProject[idx+1:]
                        serviceUtil.RemandInstanceQuota(
@@ -82,25 +86,25 @@ func (h *InstanceEventHandler) OnEvent(evt 
discovery.KvEvent) {
                }
        }
 
-       if event.Center().Closed() {
-               log.Warnf("caught [%s] instance[%s/%s] event, endpoints %v, but 
notify service is closed",
-                       action, providerID, providerInstanceID, 
instance.Endpoints)
+       // 查询服务版本信息
+       ms, err := serviceUtil.GetService(ctx, domainProject, providerID)
+       if err != nil {
+               log.Error(fmt.Sprintf("caught [%s] instance[%s/%s] event, 
endpoints %v, get cached provider's file failed",
+                       action, providerID, providerInstanceID, 
instance.Endpoints), err)
                return
        }
 
-       // 查询服务版本信息
-       ctx := context.WithValue(context.WithValue(context.Background(),
-               util.CtxCacheOnly, "1"),
-               util.CtxGlobal, "1")
-       ms, err := serviceUtil.GetService(ctx, domainProject, providerID)
-       if ms == nil {
-               log.Errorf(err, "caught [%s] instance[%s/%s] event, endpoints 
%v, get cached provider's file failed",
+       if event.Center().Closed() {
+               log.Warnf("caught [%s] instance[%s/%s] event, endpoints %v, but 
notify service is closed",
                        action, providerID, providerInstanceID, 
instance.Endpoints)
                return
        }
 
-       frameworkName, frameworkVersion := getFramework(ms)
-       metrics.ReportFramework(domainName, projectName, frameworkName, 
frameworkVersion, count)
+       if action != pb.EVT_UPDATE {
+               frameworkName, frameworkVersion := getFramework(ms)
+               metrics.ReportInstances(domainName, count)
+               metrics.ReportFramework(domainName, projectName, frameworkName, 
frameworkVersion, count)
+       }
 
        log.Infof("caught [%s] service[%s][%s/%s/%s/%s] instance[%s] event, 
endpoints %v",
                action, providerID, ms.Environment, ms.AppId, ms.ServiceName, 
ms.Version,
@@ -135,7 +139,6 @@ func PublishInstanceEvent(evt discovery.KvEvent, 
domainProject string, serviceKe
                Instance: evt.KV.Value.(*pb.MicroServiceInstance),
        }
        for _, consumerID := range subscribers {
-               // TODO add超时怎么处理?
                evt := event.NewInstanceEventWithTime(consumerID, 
domainProject, evt.Revision, evt.CreateAt, response)
                err := event.Center().Fire(evt)
                if err != nil {
diff --git a/server/service/util/microservice_util.go 
b/server/service/util/microservice_util.go
index c10f33f..987d60d 100644
--- a/server/service/util/microservice_util.go
+++ b/server/service/util/microservice_util.go
@@ -64,15 +64,6 @@ func GetService(ctx context.Context, domainProject string, 
serviceID string) (*p
        return serviceResp.Kvs[0].Value.(*pb.MicroService), nil
 }
 
-// GetServiceFromCache gets service from cache
-func GetServiceFromCache(domainProject string, serviceID string) 
*pb.MicroService {
-       ctx := context.WithValue(context.WithValue(context.Background(),
-               util.CtxCacheOnly, "1"),
-               util.CtxGlobal, "1")
-       svc, _ := GetService(ctx, domainProject, serviceID)
-       return svc
-}
-
 func getServicesRawData(ctx context.Context, domainProject string) 
([]*discovery.KeyValue, error) {
        key := apt.GenerateServiceKey(domainProject, "")
        opts := append(FromContext(ctx),

Reply via email to