This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new f120eb3  SCB-1092 Output event processing time (#533)
f120eb3 is described below

commit f120eb3fbed95559002eeba432a30473c061a4cd
Author: little-cui <[email protected]>
AuthorDate: Thu Feb 14 11:02:28 2019 +0800

    SCB-1092 Output event processing time (#533)
    
    * SCB-1092 Output event processing time
    
    * SCB-1092 More abundant metrics information
---
 integration/health-metrics-grafana.json        | 330 ++++++++++++++++++++++---
 pkg/buffer/reader.go                           |  38 +++
 pkg/buffer/reader_test.go                      |  51 ++++
 pkg/client/sc/client_lb.go                     |  21 +-
 pkg/client/sc/client_lb_test.go                |  56 +++++
 pkg/notify/notice.go                           |   8 +-
 pkg/notify/notice_test.go                      |  33 +++
 pkg/rest/client.go                             |  54 ++--
 server/notify/listwatcher.go                   |  17 +-
 server/notify/metrics.go                       |   9 +-
 server/notify/stream.go                        |   4 +-
 server/notify/stream_test.go                   |  25 +-
 server/notify/websocket.go                     |   5 +-
 server/plugin/pkg/discovery/cacher.go          |   2 +-
 server/plugin/pkg/discovery/etcd/cacher_kv.go  |  23 +-
 server/plugin/pkg/discovery/metrics.go         |  20 +-
 server/plugin/pkg/discovery/types.go           |   7 +
 server/service/event/instance_event_handler.go |  11 +-
 server/service/event/rule_event_handler.go     |  16 +-
 server/service/event/tag_event_handler.go      |  20 +-
 20 files changed, 623 insertions(+), 127 deletions(-)

diff --git a/integration/health-metrics-grafana.json 
b/integration/health-metrics-grafana.json
index 005c2bb..e7acfb7 100644
--- a/integration/health-metrics-grafana.json
+++ b/integration/health-metrics-grafana.json
@@ -1157,7 +1157,7 @@
       "fill": 1,
       "gridPos": {
         "h": 6,
-        "w": 8,
+        "w": 6,
         "x": 12,
         "y": 13
       },
@@ -1239,6 +1239,99 @@
       }
     },
     {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_LOCAL}",
+      "fill": 1,
+      "gridPos": {
+        "h": 6,
+        "w": 6,
+        "x": 18,
+        "y": 13
+      },
+      "height": "",
+      "id": 42,
+      "legend": {
+        "alignAsTable": true,
+        "avg": true,
+        "current": false,
+        "hideEmpty": true,
+        "hideZero": true,
+        "max": true,
+        "min": true,
+        "rightSide": false,
+        "show": true,
+        "sort": "avg",
+        "sortDesc": true,
+        "total": false,
+        "values": true
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "minSpan": 4,
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": 
"max(avg_over_time(service_center_notify_publish_durations_microseconds{job=\"service-center\"}[1m]))
 by (source)",
+          "format": "time_series",
+          "intervalFactor": 1,
+          "legendFormat": "{{source}}",
+          "refId": "B"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Events Latency",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "transparent": false,
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "µs",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
       "cacheTimeout": null,
       "colorBackground": false,
       "colorValue": true,
@@ -2030,25 +2123,205 @@
       "steppedLine": false,
       "targets": [
         {
-          "expr": 
"sum(service_center_notify_publish_total{job=\"service-center\"}) by 
(instance,source)",
+          "expr": 
"sum(irate(service_center_db_backend_event_total{job=\"service-center\"}[1m])) 
by (instance,prefix)",
           "format": "time_series",
-          "instant": false,
           "intervalFactor": 1,
-          "legendFormat": "{{instance}}> {{source}}",
+          "legendFormat": "{{instance}}> {{prefix}}",
+          "refId": "B"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Publish Events",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "transparent": false,
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "decimals": 0,
+          "format": "ops",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_LOCAL}",
+      "fill": 1,
+      "gridPos": {
+        "h": 6,
+        "w": 6,
+        "x": 6,
+        "y": 34
+      },
+      "height": "",
+      "id": 41,
+      "legend": {
+        "alignAsTable": false,
+        "avg": true,
+        "current": false,
+        "hideEmpty": true,
+        "hideZero": true,
+        "max": true,
+        "min": true,
+        "rightSide": true,
+        "show": false,
+        "sort": "avg",
+        "sortDesc": true,
+        "total": false,
+        "values": true
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "minSpan": 4,
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": 
"max(avg_over_time(service_center_db_backend_event_durations_microseconds{job=\"service-center\"}[1m]))
 by (instance, prefix)",
+          "format": "time_series",
+          "intervalFactor": 1,
+          "legendFormat": "{{instance}}> {{prefix}}",
           "refId": "A"
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Publish Events Latency",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "transparent": false,
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "µs",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
         },
         {
-          "expr": 
"sum(service_center_db_backend_event_total{job=\"service-center\"}) by 
(instance)",
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_LOCAL}",
+      "fill": 1,
+      "gridPos": {
+        "h": 6,
+        "w": 6,
+        "x": 12,
+        "y": 34
+      },
+      "height": "",
+      "id": 40,
+      "legend": {
+        "alignAsTable": false,
+        "avg": false,
+        "current": false,
+        "hideEmpty": true,
+        "hideZero": true,
+        "max": false,
+        "min": false,
+        "rightSide": false,
+        "show": false,
+        "sort": "avg",
+        "sortDesc": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "minSpan": 4,
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": 
"sum(irate(service_center_notify_publish_total{job=\"service-center\"}[1m])) by 
(instance,source)",
           "format": "time_series",
+          "instant": false,
           "intervalFactor": 1,
-          "legendFormat": "{{instance}}> BACKEND",
-          "refId": "B"
+          "legendFormat": "{{instance}}> {{source}}",
+          "refId": "A"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Events Total",
+      "title": "Subscribe Events",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2066,7 +2339,7 @@
       "yaxes": [
         {
           "decimals": 0,
-          "format": "none",
+          "format": "ops",
           "label": null,
           "logBase": 1,
           "max": null,
@@ -2097,7 +2370,7 @@
       "gridPos": {
         "h": 6,
         "w": 6,
-        "x": 6,
+        "x": 18,
         "y": 34
       },
       "height": "",
@@ -2137,19 +2410,12 @@
           "intervalFactor": 1,
           "legendFormat": "{{instance}}> {{source}}",
           "refId": "B"
-        },
-        {
-          "expr": 
"max(avg_over_time(service_center_db_backend_event_durations_microseconds{job=\"service-center\"}[1m]))
 by (instance)",
-          "format": "time_series",
-          "intervalFactor": 1,
-          "legendFormat": "{{instance}}> BACKEND",
-          "refId": "A"
         }
       ],
       "thresholds": [],
       "timeFrom": null,
       "timeShift": null,
-      "title": "Events Latency",
+      "title": "Subscribe Events Latency",
       "tooltip": {
         "shared": true,
         "sort": 0,
@@ -2197,8 +2463,8 @@
       "gridPos": {
         "h": 6,
         "w": 6,
-        "x": 12,
-        "y": 34
+        "x": 0,
+        "y": 40
       },
       "height": "",
       "id": 37,
@@ -2292,8 +2558,8 @@
       "gridPos": {
         "h": 6,
         "w": 6,
-        "x": 18,
-        "y": 34
+        "x": 6,
+        "y": 40
       },
       "height": "",
       "id": 38,
@@ -2381,7 +2647,7 @@
         "h": 3,
         "w": 24,
         "x": 0,
-        "y": 40
+        "y": 46
       },
       "height": "1px",
       "id": 4,
@@ -2402,7 +2668,7 @@
         "h": 7,
         "w": 8,
         "x": 0,
-        "y": 43
+        "y": 49
       },
       "height": "",
       "id": 9,
@@ -2490,7 +2756,7 @@
         "h": 7,
         "w": 8,
         "x": 8,
-        "y": 43
+        "y": 49
       },
       "id": 14,
       "legend": {
@@ -2575,7 +2841,7 @@
         "h": 7,
         "w": 8,
         "x": 16,
-        "y": 43
+        "y": 49
       },
       "id": 5,
       "legend": {
@@ -2660,7 +2926,7 @@
         "h": 7,
         "w": 8,
         "x": 0,
-        "y": 50
+        "y": 56
       },
       "id": 6,
       "legend": {
@@ -2745,7 +3011,7 @@
         "h": 7,
         "w": 8,
         "x": 8,
-        "y": 50
+        "y": 56
       },
       "height": "",
       "id": 2,
@@ -2834,7 +3100,7 @@
         "h": 7,
         "w": 8,
         "x": 16,
-        "y": 50
+        "y": 56
       },
       "height": "",
       "id": 8,
@@ -2921,7 +3187,7 @@
         "h": 7,
         "w": 24,
         "x": 0,
-        "y": 57
+        "y": 63
       },
       "id": 7,
       "legend": {
@@ -3014,7 +3280,7 @@
     "list": []
   },
   "time": {
-    "from": "now-1h",
+    "from": "now-5m",
     "to": "now"
   },
   "timepicker": {
@@ -3045,5 +3311,5 @@
   "timezone": "",
   "title": "ServiceCenter",
   "uid": "Zg6NoHGiz",
-  "version": 7
+  "version": 9
 }
\ No newline at end of file
diff --git a/pkg/buffer/reader.go b/pkg/buffer/reader.go
new file mode 100644
index 0000000..06a0c23
--- /dev/null
+++ b/pkg/buffer/reader.go
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package buffer
+
+import (
+       "bytes"
+       "io"
+       "strings"
+)
+
+func ReadLine(buf *bytes.Buffer, cb func(line string) bool) error {
+       for {
+               s, err := buf.ReadString('\n')
+               if err != nil && err != io.EOF {
+                       return err
+               }
+               if !cb(strings.TrimSpace(s)) {
+                       break
+               }
+               if err != nil {
+                       break
+               }
+       }
+       return nil
+}
diff --git a/pkg/buffer/reader_test.go b/pkg/buffer/reader_test.go
new file mode 100644
index 0000000..36caf39
--- /dev/null
+++ b/pkg/buffer/reader_test.go
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package buffer
+
+import (
+       "bytes"
+       "testing"
+)
+
+func TestReadLine(t *testing.T) {
+       buf := bytes.NewBuffer([]byte("a\nb\r\nc\n "))
+       err := ReadLine(buf, func(line string) bool {
+               switch line {
+               case "a", "b", "c", "":
+               default:
+                       t.Fatal("TestReadLine")
+               }
+               return true
+       })
+       if err != nil {
+               t.Fatal("TestReadLine", err)
+       }
+
+       buf = bytes.NewBuffer([]byte("a\nb"))
+       err = ReadLine(buf, func(line string) bool {
+               switch line {
+               case "a":
+               case "b":
+                       t.Fatal("TestReadLine")
+               default:
+                       t.Fatal("TestReadLine")
+               }
+               return false
+       })
+       if err != nil {
+               t.Fatal("TestReadLine", err)
+       }
+}
diff --git a/pkg/client/sc/client_lb.go b/pkg/client/sc/client_lb.go
index 55818b1..8b99b01 100644
--- a/pkg/client/sc/client_lb.go
+++ b/pkg/client/sc/client_lb.go
@@ -16,9 +16,11 @@
 package sc
 
 import (
-       "github.com/apache/servicecomb-service-center/pkg/backoff"
+       "errors"
+       "fmt"
        "github.com/apache/servicecomb-service-center/pkg/lb"
        "github.com/apache/servicecomb-service-center/pkg/rest"
+       "github.com/apache/servicecomb-service-center/pkg/util"
        "golang.org/x/net/context"
        "net/http"
 )
@@ -46,9 +48,18 @@ func (c *LBClient) Next() string {
 }
 
 func (c *LBClient) RestDoWithContext(ctx context.Context, method string, api 
string, headers http.Header, body []byte) (resp *http.Response, err error) {
-       err = backoff.DelayIn(c.Retries, func() (rerr error) {
-               resp, rerr = c.HttpDoWithContext(ctx, method, c.Next()+api, 
headers, body)
-               return
-       })
+       var errs []string
+       for i := 0; i < c.Retries; i++ {
+               addr := c.Next()
+               resp, err = c.HttpDoWithContext(ctx, method, addr+api, headers, 
body)
+               if err != nil {
+                       errs = append(errs, fmt.Sprintf("[%s]: %s", addr, 
err.Error()))
+                       continue
+               }
+               break
+       }
+       if err != nil {
+               err = errors.New(util.StringJoin(errs, ", "))
+       }
        return
 }
diff --git a/pkg/client/sc/client_lb_test.go b/pkg/client/sc/client_lb_test.go
new file mode 100644
index 0000000..0dcf159
--- /dev/null
+++ b/pkg/client/sc/client_lb_test.go
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sc
+
+import (
+       "fmt"
+       "github.com/apache/servicecomb-service-center/pkg/rest"
+       "golang.org/x/net/context"
+       "io/ioutil"
+       "net/http"
+       "net/http/httptest"
+       "os"
+       "testing"
+)
+
+func TestNewLBClient(t *testing.T) {
+       os.Setenv("DEBUG_MODE", "1")
+       client, err := NewLBClient([]string{"x.x.x.x", "rest://2.2.2.2"}, 
rest.DefaultURLClientOption())
+       if err != nil {
+               t.Fatal("TestNewLBClient", err)
+       }
+       _, err = client.RestDoWithContext(context.Background(), "yyy", "/zzz", 
http.Header{"test": []string{"a"}}, []byte(`abcdef`))
+       if err == nil {
+               t.Fatal("TestNewLBClient")
+       }
+       fmt.Println(err)
+
+       svc := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, 
req *http.Request) {
+               w.WriteHeader(http.StatusOK)
+               b, _ := ioutil.ReadAll(req.Body)
+               w.Write(b)
+       }))
+       defer svc.Close()
+
+       client, err = NewLBClient([]string{"x.x.x.x", svc.URL}, 
rest.DefaultURLClientOption())
+       if err != nil {
+               t.Fatal("TestNewLBClient", err)
+       }
+       _, err = client.RestDoWithContext(context.Background(), http.MethodGet, 
"", http.Header{"test": []string{"a"}}, []byte(`abcdef`))
+       if err != nil {
+               t.Fatal("TestNewLBClient", err)
+       }
+}
diff --git a/pkg/notify/notice.go b/pkg/notify/notice.go
index e5ac265..cc3b272 100644
--- a/pkg/notify/notice.go
+++ b/pkg/notify/notice.go
@@ -51,6 +51,10 @@ func (s *baseEvent) CreateAt() time.Time {
        return s.createAt.Local()
 }
 
-func NewEvent(t Type, s string, g string) Event {
-       return &baseEvent{t, s, g, simple.FromTime(time.Now())}
+func NewEvent(t Type, s, g string) Event {
+       return NewEventWithTime(t, s, g, simple.FromTime(time.Now()))
+}
+
+func NewEventWithTime(t Type, s, g string, now simple.Time) Event {
+       return &baseEvent{t, s, g, now}
 }
diff --git a/pkg/notify/notice_test.go b/pkg/notify/notice_test.go
new file mode 100644
index 0000000..5ad78d2
--- /dev/null
+++ b/pkg/notify/notice_test.go
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package notify
+
+import (
+       "fmt"
+       "testing"
+)
+
+func TestNewEventWithTime(t *testing.T) {
+       evt := NewEvent(NOTIFTY, "a", "b")
+       if evt.CreateAt().UnixNano() == 0 {
+               t.Fatal("TestNewEventWithTime")
+       }
+       fmt.Println(evt.CreateAt())
+
+       if evt.Type() != NOTIFTY || evt.Subject() != "a" || evt.Group() != "b" {
+               t.Fatal("TestNewEventWithTime")
+       }
+}
diff --git a/pkg/rest/client.go b/pkg/rest/client.go
index 8250637..6eb5843 100644
--- a/pkg/rest/client.go
+++ b/pkg/rest/client.go
@@ -21,12 +21,14 @@ import (
        "crypto/tls"
        "errors"
        "fmt"
+       "github.com/apache/servicecomb-service-center/pkg/buffer"
        "github.com/apache/servicecomb-service-center/pkg/tlsutil"
        "github.com/apache/servicecomb-service-center/pkg/util"
        "golang.org/x/net/context"
        "io"
        "io/ioutil"
        "net/http"
+       "net/http/httputil"
        "net/url"
        "os"
        "strings"
@@ -118,10 +120,15 @@ func (client *URLClient) HttpDoWithContext(ctx 
context.Context, method string, r
        req = req.WithContext(ctx)
        req.Header = headers
 
+       DumpRequestOut(req)
+
        resp, err = client.Client.Do(req)
        if err != nil {
                return nil, err
        }
+
+       DumpResponse(resp)
+
        switch resp.Header.Get(HEADER_CONTENT_ENCODING) {
        case "gzip":
                reader, err := NewGZipBodyReader(resp.Body)
@@ -133,32 +140,41 @@ func (client *URLClient) HttpDoWithContext(ctx 
context.Context, method string, r
                resp.Body = reader
        }
 
-       if os.Getenv("DEBUG_MODE") == "1" {
-               fmt.Println("--- BEGIN ---")
-               fmt.Printf("> %s %s %s\n", req.Method, req.URL.RequestURI(), 
req.Proto)
-               for key, header := range req.Header {
-                       for _, value := range header {
-                               fmt.Printf("> %s: %s\n", key, value)
-                       }
-               }
-               fmt.Println(">")
-               fmt.Println(util.BytesToStringWithNoCopy(body))
-               fmt.Printf("< %s %s\n", resp.Proto, resp.Status)
-               for key, header := range resp.Header {
-                       for _, value := range header {
-                               fmt.Printf("< %s: %s\n", key, value)
-                       }
-               }
-               fmt.Println("<")
-               fmt.Println("--- END ---")
-       }
        return resp, nil
 }
 
+func DumpRequestOut(req *http.Request) {
+       if req == nil || !util.StringTRUE(os.Getenv("DEBUG_MODE")) {
+               return
+       }
+
+       b, _ := httputil.DumpRequestOut(req, true)
+       buffer.ReadLine(bytes.NewBuffer(b), func(line string) bool {
+               fmt.Println(">", line)
+               return true
+       })
+}
+
+func DumpResponse(resp *http.Response) {
+       if resp == nil || !util.StringTRUE(os.Getenv("DEBUG_MODE")) {
+               return
+       }
+
+       b, _ := httputil.DumpResponse(resp, true)
+       buffer.ReadLine(bytes.NewBuffer(b), func(line string) bool {
+               fmt.Println("<", line)
+               return true
+       })
+}
+
 func (client *URLClient) HttpDo(method string, rawURL string, headers 
http.Header, body []byte) (resp *http.Response, err error) {
        return client.HttpDoWithContext(context.Background(), method, rawURL, 
headers, body)
 }
 
+func DefaultURLClientOption() URLClientOption {
+       return defaultURLClientOption
+}
+
 func setOptionDefaultValue(o *URLClientOption) URLClientOption {
        if o == nil {
                return defaultURLClientOption
diff --git a/server/notify/listwatcher.go b/server/notify/listwatcher.go
index 0b831d7..b5c5394 100644
--- a/server/notify/listwatcher.go
+++ b/server/notify/listwatcher.go
@@ -20,6 +20,7 @@ import (
        "github.com/apache/servicecomb-service-center/pkg/gopool"
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/pkg/notify"
+       simple "github.com/apache/servicecomb-service-center/pkg/time"
        pb "github.com/apache/servicecomb-service-center/server/core/proto"
        "golang.org/x/net/context"
        "time"
@@ -124,18 +125,26 @@ func (w *InstanceEventListWatcher) Close() {
        close(w.Job)
 }
 
-func NewInstanceEvent(group, subject string, rev int64, response 
*pb.WatchInstanceResponse) *InstanceEvent {
+func NewInstanceEvent(serviceId, domainProject string, rev int64, response 
*pb.WatchInstanceResponse) *InstanceEvent {
        return &InstanceEvent{
-               Event:    notify.NewEvent(INSTANCE, subject, group),
+               Event:    notify.NewEvent(INSTANCE, domainProject, serviceId),
                Revision: rev,
                Response: response,
        }
 }
 
-func NewInstanceEventListWatcher(group string, subject string,
+func NewInstanceEventWithTime(serviceId, domainProject string, rev int64, 
createAt simple.Time, response *pb.WatchInstanceResponse) *InstanceEvent {
+       return &InstanceEvent{
+               Event:    notify.NewEventWithTime(INSTANCE, domainProject, 
serviceId, createAt),
+               Revision: rev,
+               Response: response,
+       }
+}
+
+func NewInstanceEventListWatcher(serviceId, domainProject string,
        listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) 
*InstanceEventListWatcher {
        watcher := &InstanceEventListWatcher{
-               Subscriber: notify.NewSubscriber(INSTANCE, subject, group),
+               Subscriber: notify.NewSubscriber(INSTANCE, domainProject, 
serviceId),
                Job:        make(chan *InstanceEvent, INSTANCE.QueueSize()),
                ListFunc:   listFunc,
                listCh:     make(chan struct{}),
diff --git a/server/notify/metrics.go b/server/notify/metrics.go
index eac4c52..94fa7d9 100644
--- a/server/notify/metrics.go
+++ b/server/notify/metrics.go
@@ -16,6 +16,7 @@
 package notify
 
 import (
+       "github.com/apache/servicecomb-service-center/pkg/notify"
        "github.com/apache/servicecomb-service-center/server/metric"
        "github.com/prometheus/client_golang/prometheus"
        "time"
@@ -49,13 +50,13 @@ func init() {
        prometheus.MustRegister(notifyCounter, notifyLatency)
 }
 
-func ReportPublishCompleted(source string, err error, start time.Time) {
+func ReportPublishCompleted(evt notify.Event, err error) {
        instance := metric.InstanceName()
-       elapsed := float64(time.Since(start).Nanoseconds()) / 
float64(time.Microsecond)
+       elapsed := float64(time.Since(evt.CreateAt()).Nanoseconds()) / 
float64(time.Microsecond)
        status := success
        if err != nil {
                status = failure
        }
-       notifyLatency.WithLabelValues(instance, source, status).Observe(elapsed)
-       notifyCounter.WithLabelValues(instance, source, status).Inc()
+       notifyLatency.WithLabelValues(instance, evt.Type().String(), 
status).Observe(elapsed)
+       notifyCounter.WithLabelValues(instance, evt.Type().String(), 
status).Inc()
 }
diff --git a/server/notify/stream.go b/server/notify/stream.go
index 0ac6403..3fe7371 100644
--- a/server/notify/stream.go
+++ b/server/notify/stream.go
@@ -49,7 +49,9 @@ func HandleWatchJob(watcher *InstanceEventListWatcher, stream 
pb.ServiceInstance
                                watcher.Subject(), watcher.Group())
 
                        err = stream.Send(resp)
-                       ReportPublishCompleted(INSTANCE.String(), err, 
job.CreateAt())
+                       if job != nil {
+                               ReportPublishCompleted(job, err)
+                       }
                        if err != nil {
                                log.Errorf(err, "send message error, subject: 
%s, group: %s",
                                        watcher.Subject(), watcher.Group())
diff --git a/server/notify/stream_test.go b/server/notify/stream_test.go
index 60bd8d9..3280bf6 100644
--- a/server/notify/stream_test.go
+++ b/server/notify/stream_test.go
@@ -18,21 +18,36 @@ package notify
 
 import (
        "github.com/apache/servicecomb-service-center/pkg/log"
+       simple "github.com/apache/servicecomb-service-center/pkg/time"
+       pb "github.com/apache/servicecomb-service-center/server/core/proto"
        "golang.org/x/net/context"
+       "google.golang.org/grpc"
        "testing"
+       "time"
 )
 
+type grpcWatchServer struct {
+       grpc.ServerStream
+}
+
+func (x *grpcWatchServer) Send(m *pb.WatchInstanceResponse) error {
+       return nil
+}
+
+func (x *grpcWatchServer) Context() context.Context {
+       return context.Background()
+}
+
 func TestHandleWatchJob(t *testing.T) {
-       defer log.Recover()
        w := NewInstanceEventListWatcher("g", "s", nil)
        w.Job <- nil
-       err := HandleWatchJob(w, nil)
+       err := HandleWatchJob(w, &grpcWatchServer{})
        if err == nil {
                t.Fatalf("TestHandleWatchJob failed")
        }
-       w.Job <- NewInstanceEvent("g", "s", 1, nil)
-       err = HandleWatchJob(w, nil)
-       t.Fatalf("TestHandleWatchJob failed")
+       w.Job <- NewInstanceEventWithTime("g", "s", 1, 
simple.FromTime(time.Now()), nil)
+       w.Job <- nil
+       HandleWatchJob(w, &grpcWatchServer{})
 }
 
 func TestDoStreamListAndWatch(t *testing.T) {
diff --git a/server/notify/websocket.go b/server/notify/websocket.go
index cc789e8..033ab12 100644
--- a/server/notify/websocket.go
+++ b/server/notify/websocket.go
@@ -21,7 +21,6 @@ import (
        "fmt"
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/pkg/util"
-       apt "github.com/apache/servicecomb-service-center/server/core"
        pb "github.com/apache/servicecomb-service-center/server/core/proto"
        serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
        "github.com/gorilla/websocket"
@@ -239,7 +238,7 @@ func (wh *WebSocket) HandleWatchWebSocketJob(o interface{}) 
{
 
        err := wh.WriteMessage(message)
        if job != nil {
-               ReportPublishCompleted(INSTANCE.String(), err, job.CreateAt())
+               ReportPublishCompleted(job, err)
        }
        if err != nil {
                log.Errorf(err, "watcher[%s] catch an err, subject: %s, group: 
%s",
@@ -272,7 +271,7 @@ func DoWebSocketListAndWatch(ctx context.Context, serviceId 
string, f func() ([]
        socket := &WebSocket{
                ctx:     ctx,
                conn:    conn,
-               watcher: NewInstanceEventListWatcher(serviceId, 
apt.GetInstanceRootKey(domainProject)+"/", f),
+               watcher: NewInstanceEventListWatcher(serviceId, domainProject, 
f),
        }
        process(socket)
 }
diff --git a/server/plugin/pkg/discovery/cacher.go 
b/server/plugin/pkg/discovery/cacher.go
index 086d928..9b19719 100644
--- a/server/plugin/pkg/discovery/cacher.go
+++ b/server/plugin/pkg/discovery/cacher.go
@@ -40,7 +40,7 @@ func (c *CommonCacher) Notify(action proto.EventType, key 
string, kv *KeyValue)
        default:
                c.cache.Put(key, kv)
        }
-       c.OnEvent(KvEvent{Type: action, KV: kv, Revision: kv.ModRevision})
+       c.OnEvent(NewKvEvent(action, kv, kv.ModRevision))
 }
 
 func (c *CommonCacher) OnEvent(evt KvEvent) {
diff --git a/server/plugin/pkg/discovery/etcd/cacher_kv.go 
b/server/plugin/pkg/discovery/etcd/cacher_kv.go
index 20596c1..323e1d8 100644
--- a/server/plugin/pkg/discovery/etcd/cacher_kv.go
+++ b/server/plugin/pkg/discovery/etcd/cacher_kv.go
@@ -132,7 +132,7 @@ func (c *KvCacher) handleWatcher(watcher Watcher) error {
                rev := resp.Revision
                evts := make([]discovery.KvEvent, 0, len(resp.Kvs))
                for _, kv := range resp.Kvs {
-                       evt := discovery.KvEvent{Revision: kv.ModRevision}
+                       evt := discovery.NewKvEvent(proto.EVT_CREATE, nil, 
kv.ModRevision)
                        switch {
                        case resp.Action == registry.Put && kv.Version == 1:
                                evt.Type, evt.KV = proto.EVT_CREATE, 
c.doParse(kv)
@@ -252,11 +252,7 @@ func (c *KvCacher) filterDelete(newStore 
map[string]*mvccpb.KeyValue,
                        i = 0
                }
 
-               block[i] = discovery.KvEvent{
-                       Revision: rev,
-                       Type:     proto.EVT_DELETE,
-                       KV:       v,
-               }
+               block[i] = discovery.NewKvEvent(proto.EVT_DELETE, v, rev)
                i++
                return
        })
@@ -283,11 +279,7 @@ func (c *KvCacher) filterCreateOrUpdate(newStore 
map[string]*mvccpb.KeyValue,
                        }
 
                        if kv := c.doParse(v); kv != nil {
-                               block[i] = discovery.KvEvent{
-                                       Revision: rev,
-                                       Type:     proto.EVT_CREATE,
-                                       KV:       kv,
-                               }
+                               block[i] = 
discovery.NewKvEvent(proto.EVT_CREATE, kv, rev)
                                i++
                        }
                        continue
@@ -304,11 +296,7 @@ func (c *KvCacher) filterCreateOrUpdate(newStore 
map[string]*mvccpb.KeyValue,
                }
 
                if kv := c.doParse(v); kv != nil {
-                       block[i] = discovery.KvEvent{
-                               Revision: rev,
-                               Type:     proto.EVT_UPDATE,
-                               KV:       kv,
-                       }
+                       block[i] = discovery.NewKvEvent(proto.EVT_UPDATE, kv, 
rev)
                        i++
                }
        }
@@ -368,7 +356,6 @@ func (c *KvCacher) deferHandle(ctx context.Context) {
 }
 
 func (c *KvCacher) onEvents(evts []discovery.KvEvent) {
-       start := time.Now()
        init := !c.IsReady()
        for i, evt := range evts {
                key := util.BytesToStringWithNoCopy(evt.KV.Key)
@@ -406,7 +393,7 @@ func (c *KvCacher) onEvents(evts []discovery.KvEvent) {
 
        c.notify(evts)
 
-       discovery.ReportProcessEventCompleted(evts, start)
+       discovery.ReportProcessEventCompleted(c.Cfg.Key, evts)
 }
 
 func (c *KvCacher) notify(evts []discovery.KvEvent) {
diff --git a/server/plugin/pkg/discovery/metrics.go 
b/server/plugin/pkg/discovery/metrics.go
index 4e3737b..16681e8 100644
--- a/server/plugin/pkg/discovery/metrics.go
+++ b/server/plugin/pkg/discovery/metrics.go
@@ -22,11 +22,6 @@ import (
        "time"
 )
 
-const (
-       success = "SUCCESS"
-       failure = "FAILURE"
-)
-
 var (
        eventsCounter = prometheus.NewGaugeVec(
                prometheus.GaugeOpts{
@@ -34,7 +29,7 @@ var (
                        Subsystem: "db",
                        Name:      "backend_event_total",
                        Help:      "Counter of backend events",
-               }, []string{"instance"})
+               }, []string{"instance", "prefix"})
 
        eventsLatency = prometheus.NewSummaryVec(
                prometheus.SummaryOpts{
@@ -43,20 +38,23 @@ var (
                        Name:       "backend_event_durations_microseconds",
                        Help:       "Latency of backend events processing",
                        Objectives: prometheus.DefObjectives,
-               }, []string{"instance"})
+               }, []string{"instance", "prefix"})
 )
 
 func init() {
        prometheus.MustRegister(eventsCounter, eventsLatency)
 }
 
-func ReportProcessEventCompleted(evts []KvEvent, start time.Time) {
+func ReportProcessEventCompleted(prefix string, evts []KvEvent) {
        l := float64(len(evts))
        if l == 0 {
                return
        }
        instance := metric.InstanceName()
-       elapsed := float64(time.Since(start).Nanoseconds()) / 
float64(time.Microsecond)
-       eventsLatency.WithLabelValues(instance).Observe(elapsed / l)
-       eventsCounter.WithLabelValues(instance).Add(l)
+       now := time.Now()
+       for _, evt := range evts {
+               elapsed := float64(now.Sub(evt.CreateAt.Local()).Nanoseconds()) 
/ float64(time.Microsecond)
+               eventsLatency.WithLabelValues(instance, prefix).Observe(elapsed)
+       }
+       eventsCounter.WithLabelValues(instance, prefix).Add(l)
 }
diff --git a/server/plugin/pkg/discovery/types.go 
b/server/plugin/pkg/discovery/types.go
index 1b759e7..d83856d 100644
--- a/server/plugin/pkg/discovery/types.go
+++ b/server/plugin/pkg/discovery/types.go
@@ -18,10 +18,12 @@ package discovery
 import (
        "encoding/json"
        "fmt"
+       simple "github.com/apache/servicecomb-service-center/pkg/time"
        "github.com/apache/servicecomb-service-center/pkg/util"
        pb "github.com/apache/servicecomb-service-center/server/core/proto"
        
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
        "strconv"
+       "time"
 )
 
 var (
@@ -85,6 +87,7 @@ type KvEvent struct {
        Revision int64
        Type     pb.EventType
        KV       *KeyValue
+       CreateAt simple.Time
 }
 
 type KvEventFunc func(evt KvEvent)
@@ -93,3 +96,7 @@ type KvEventHandler interface {
        Type() Type
        OnEvent(evt KvEvent)
 }
+
+func NewKvEvent(action pb.EventType, kv *KeyValue, rev int64) KvEvent {
+       return KvEvent{Type: action, KV: kv, Revision: rev, CreateAt: 
simple.FromTime(time.Now())}
+}
diff --git a/server/service/event/instance_event_handler.go 
b/server/service/event/instance_event_handler.go
index e82ec04..e4b99cf 100644
--- a/server/service/event/instance_event_handler.go
+++ b/server/service/event/instance_event_handler.go
@@ -88,15 +88,14 @@ func (h *InstanceEventHandler) OnEvent(evt 
discovery.KvEvent) {
                return
        }
 
-       PublishInstanceEvent(domainProject, action, 
pb.MicroServiceToKey(domainProject, ms),
-               instance, evt.Revision, consumerIds)
+       PublishInstanceEvent(evt, domainProject, 
pb.MicroServiceToKey(domainProject, ms), consumerIds)
 }
 
 func NewInstanceEventHandler() *InstanceEventHandler {
        return &InstanceEventHandler{}
 }
 
-func PublishInstanceEvent(domainProject string, action pb.EventType, 
serviceKey *pb.MicroServiceKey, instance *pb.MicroServiceInstance, rev int64, 
subscribers []string) {
+func PublishInstanceEvent(evt discovery.KvEvent, domainProject string, 
serviceKey *pb.MicroServiceKey, subscribers []string) {
        defer cache.FindInstances.Remove(serviceKey)
 
        if len(subscribers) == 0 {
@@ -105,13 +104,13 @@ func PublishInstanceEvent(domainProject string, action 
pb.EventType, serviceKey
 
        response := &pb.WatchInstanceResponse{
                Response: pb.CreateResponse(pb.Response_SUCCESS, "Watch 
instance successfully."),
-               Action:   string(action),
+               Action:   string(evt.Type),
                Key:      serviceKey,
-               Instance: instance,
+               Instance: evt.KV.Value.(*pb.MicroServiceInstance),
        }
        for _, consumerId := range subscribers {
                // TODO add超时怎么处理?
-               job := notify.NewInstanceEvent(consumerId, 
apt.GetInstanceRootKey(domainProject)+"/", rev, response)
+               job := notify.NewInstanceEventWithTime(consumerId, 
domainProject, evt.Revision, evt.CreateAt, response)
                notify.NotifyCenter().Publish(job)
        }
 }
diff --git a/server/service/event/rule_event_handler.go 
b/server/service/event/rule_event_handler.go
index 58051d9..6ac3a28 100644
--- a/server/service/event/rule_event_handler.go
+++ b/server/service/event/rule_event_handler.go
@@ -30,12 +30,13 @@ import (
 )
 
 type RulesChangedTask struct {
+       discovery.KvEvent
+
        key string
        err error
 
        DomainProject string
        ProviderId    string
-       Rev           int64
 }
 
 func (apt *RulesChangedTask) Key() string {
@@ -43,7 +44,7 @@ func (apt *RulesChangedTask) Key() string {
 }
 
 func (apt *RulesChangedTask) Do(ctx context.Context) error {
-       apt.err = apt.publish(ctx, apt.DomainProject, apt.ProviderId, apt.Rev)
+       apt.err = apt.publish(ctx, apt.DomainProject, apt.ProviderId)
        return apt.err
 }
 
@@ -51,7 +52,7 @@ func (apt *RulesChangedTask) Err() error {
        return apt.err
 }
 
-func (apt *RulesChangedTask) publish(ctx context.Context, domainProject, 
providerId string, rev int64) error {
+func (apt *RulesChangedTask) publish(ctx context.Context, domainProject, 
providerId string) error {
        ctx = context.WithValue(context.WithValue(ctx,
                serviceUtil.CTX_CACHEONLY, "1"),
                serviceUtil.CTX_GLOBAL, "1")
@@ -74,7 +75,7 @@ func (apt *RulesChangedTask) publish(ctx context.Context, 
domainProject, provide
        }
        providerKey := pb.MicroServiceToKey(domainProject, provider)
 
-       PublishInstanceEvent(domainProject, pb.EVT_EXPIRE, providerKey, nil, 
rev, consumerIds)
+       PublishInstanceEvent(apt.KvEvent, domainProject, providerKey, 
consumerIds)
        return nil
 }
 
@@ -100,18 +101,19 @@ func (h *RuleEventHandler) OnEvent(evt discovery.KvEvent) 
{
        log.Infof("caught [%s] service rule[%s/%s] event", action, providerId, 
ruleId)
 
        task.Service().Add(context.Background(),
-               NewRulesChangedAsyncTask(domainProject, providerId, 
evt.Revision))
+               NewRulesChangedAsyncTask(domainProject, providerId, evt))
 }
 
 func NewRuleEventHandler() *RuleEventHandler {
        return &RuleEventHandler{}
 }
 
-func NewRulesChangedAsyncTask(domainProject, providerId string, rev int64) 
*RulesChangedTask {
+func NewRulesChangedAsyncTask(domainProject, providerId string, evt 
discovery.KvEvent) *RulesChangedTask {
+       evt.Type = pb.EVT_EXPIRE
        return &RulesChangedTask{
+               KvEvent:       evt,
                key:           "RulesChangedAsyncTask_" + providerId,
                DomainProject: domainProject,
                ProviderId:    providerId,
-               Rev:           rev,
        }
 }
diff --git a/server/service/event/tag_event_handler.go 
b/server/service/event/tag_event_handler.go
index 7059e8f..05f4ecb 100644
--- a/server/service/event/tag_event_handler.go
+++ b/server/service/event/tag_event_handler.go
@@ -31,12 +31,13 @@ import (
 )
 
 type TagsChangedTask struct {
+       discovery.KvEvent
+
        key string
        err error
 
        DomainProject string
-       consumerId    string
-       Rev           int64
+       ConsumerId    string
 }
 
 func (apt *TagsChangedTask) Key() string {
@@ -44,7 +45,7 @@ func (apt *TagsChangedTask) Key() string {
 }
 
 func (apt *TagsChangedTask) Do(ctx context.Context) error {
-       apt.err = apt.publish(ctx, apt.DomainProject, apt.consumerId, apt.Rev)
+       apt.err = apt.publish(ctx, apt.DomainProject, apt.ConsumerId)
        return apt.err
 }
 
@@ -52,7 +53,7 @@ func (apt *TagsChangedTask) Err() error {
        return apt.err
 }
 
-func (apt *TagsChangedTask) publish(ctx context.Context, domainProject, 
consumerId string, rev int64) error {
+func (apt *TagsChangedTask) publish(ctx context.Context, domainProject, 
consumerId string) error {
        ctx = context.WithValue(context.WithValue(ctx,
                serviceUtil.CTX_CACHEONLY, "1"),
                serviceUtil.CTX_GLOBAL, "1")
@@ -86,7 +87,7 @@ func (apt *TagsChangedTask) publish(ctx context.Context, 
domainProject, consumer
                }
 
                providerKey := pb.MicroServiceToKey(domainProject, provider)
-               PublishInstanceEvent(domainProject, pb.EVT_EXPIRE, providerKey, 
nil, rev, []string{consumerId})
+               PublishInstanceEvent(apt.KvEvent, domainProject, providerKey, 
[]string{consumerId})
        }
        return nil
 }
@@ -114,18 +115,19 @@ func (h *TagEventHandler) OnEvent(evt discovery.KvEvent) {
        log.Infof("caught [%s] service tags[%s/%s] event", action, consumerId, 
evt.KV.Value)
 
        task.Service().Add(context.Background(),
-               NewTagsChangedAsyncTask(domainProject, consumerId, 
evt.Revision))
+               NewTagsChangedAsyncTask(domainProject, consumerId, evt))
 }
 
 func NewTagEventHandler() *TagEventHandler {
        return &TagEventHandler{}
 }
 
-func NewTagsChangedAsyncTask(domainProject, consumerId string, rev int64) 
*TagsChangedTask {
+func NewTagsChangedAsyncTask(domainProject, consumerId string, evt 
discovery.KvEvent) *TagsChangedTask {
+       evt.Type = pb.EVT_EXPIRE
        return &TagsChangedTask{
+               KvEvent:       evt,
                key:           "TagsChangedAsyncTask_" + consumerId,
                DomainProject: domainProject,
-               consumerId:    consumerId,
-               Rev:           rev,
+               ConsumerId:    consumerId,
        }
 }

Reply via email to