This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f646af  SCB-2094 Add heartbeat ws interface (#962)
2f646af is described below

commit 2f646aff33167799d25ebf4e1bbc5322b9a579f9
Author: robotljw <[email protected]>
AuthorDate: Fri May 21 14:05:37 2021 +0800

    SCB-2094 Add heartbeat ws interface (#962)
---
 datasource/etcd/ms.go                         |  15 +++
 datasource/etcd/util/instance_util.go         |  13 ++
 datasource/instance_test.go                   |  74 ++++++++++++
 datasource/mongo/client/dao/instance.go       |   5 +
 datasource/mongo/heartbeat/cache/heartbeat.go |   6 +-
 datasource/mongo/mongo.go                     |   2 +-
 datasource/mongo/ms.go                        |  15 +++
 datasource/ms.go                              |   2 +
 docs/user-guides.rst                          |   1 +
 docs/user-guides/data-source.rst              |  27 -----
 docs/user-guides/heartbeat.rst                |  48 ++++++++
 etc/conf/app.yaml                             |  26 ++--
 pkg/proto/service_ex.go                       |   2 +
 pkg/rbacframe/api.go                          |   3 +-
 server/alarm/common.go                        |   1 +
 server/connection/hbws/websocket.go           | 166 ++++++++++++++++++++++++++
 server/connection/hbws/websocket_test.go      |  67 +++++++++++
 server/connection/ws/websocket.go             |   5 +-
 server/connection/ws/websocket_test.go        |   2 +-
 server/event/instance_subscriber.go           |   1 +
 server/metrics/connection.go                  |   3 +-
 server/rest/controller/v4/instance_watcher.go |  14 +++
 server/server.go                              |   3 +-
 server/service/watch.go                       |  33 ++++-
 24 files changed, 484 insertions(+), 50 deletions(-)

diff --git a/datasource/etcd/ms.go b/datasource/etcd/ms.go
index 8455400..ea73322 100644
--- a/datasource/etcd/ms.go
+++ b/datasource/etcd/ms.go
@@ -659,6 +659,21 @@ func (ds *DataSource) RegisterInstance(ctx 
context.Context, request *pb.Register
        }, nil
 }
 
+func (ds *DataSource) ExistInstanceByID(ctx context.Context, request 
*pb.MicroServiceInstanceKey) (*pb.GetExistenceByIDResponse, error) {
+       domainProject := util.ParseDomainProject(ctx)
+       exist, _ := serviceUtil.InstanceExist(ctx, domainProject, 
request.ServiceId, request.InstanceId)
+       if !exist {
+               return &pb.GetExistenceByIDResponse{
+                       Response: pb.CreateResponse(pb.ErrInstanceNotExists, 
"Check instance exist failed."),
+                       Exist:    false,
+               }, datasource.ErrInstanceNotExists
+       }
+       return &pb.GetExistenceByIDResponse{
+               Response: pb.CreateResponse(pb.ResponseSuccess, "Check service 
exists successfully."),
+               Exist:    exist,
+       }, nil
+}
+
 func (ds *DataSource) GetInstance(ctx context.Context, request 
*pb.GetOneInstanceRequest) (
        *pb.GetOneInstanceResponse, error) {
        domainProject := util.ParseDomainProject(ctx)
diff --git a/datasource/etcd/util/instance_util.go 
b/datasource/etcd/util/instance_util.go
index 9a53e48..66371a0 100644
--- a/datasource/etcd/util/instance_util.go
+++ b/datasource/etcd/util/instance_util.go
@@ -65,6 +65,19 @@ func GetInstance(ctx context.Context, domainProject string, 
serviceID string, in
        return resp.Kvs[0].Value.(*pb.MicroServiceInstance), nil
 }
 
+func InstanceExist(ctx context.Context, domainProject string, serviceID 
string, instanceID string) (bool, error) {
+       key := path.GenerateInstanceKey(domainProject, serviceID, instanceID)
+       opts := append(FromContext(ctx), client.WithStrKey(key))
+       resp, err := kv.Store().Instance().Search(ctx, opts...)
+       if err != nil {
+               return false, err
+       }
+       if resp.Count == 0 {
+               return false, nil
+       }
+       return true, nil
+}
+
 func FormatRevision(revs, counts []int64) (s string) {
        for i, rev := range revs {
                s += fmt.Sprintf("%d.%d,", rev, counts[i])
diff --git a/datasource/instance_test.go b/datasource/instance_test.go
index d9ce98b..261d714 100644
--- a/datasource/instance_test.go
+++ b/datasource/instance_test.go
@@ -1391,3 +1391,77 @@ func TestInstance_Unregister(t *testing.T) {
                assert.NotEqual(t, pb.ResponseSuccess, resp.Response.GetCode())
        })
 }
+
+func TestInstance_Exist(t *testing.T) {
+       var (
+               serviceId  string
+               instanceId string
+       )
+
+       t.Run("register service and instance", func(t *testing.T) {
+               respCreateService, err := 
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+                       Service: &pb.MicroService{
+                               AppId:       "check_exist_instance_ms",
+                               ServiceName: "check_exist_instance_service_ms",
+                               Version:     "1.0.5",
+                               Level:       "FRONT",
+                               Status:      pb.MS_UP,
+                       },
+                       Tags: map[string]string{
+                               "test": "test",
+                       },
+               })
+               assert.NoError(t, err)
+               assert.Equal(t, pb.ResponseSuccess, 
respCreateService.Response.GetCode())
+               serviceId = respCreateService.ServiceId
+
+               respCreateInstance, err := 
datasource.Instance().RegisterInstance(getContext(), 
&pb.RegisterInstanceRequest{
+                       Instance: &pb.MicroServiceInstance{
+                               ServiceId: serviceId,
+                               HostName:  "UT-HOST-MS",
+                               Endpoints: []string{
+                                       "checkExist:127.0.0.2:8080",
+                               },
+                               Status: pb.MSI_UP,
+                       },
+               })
+               assert.NoError(t, err)
+               assert.Equal(t, pb.ResponseSuccess, 
respCreateInstance.Response.GetCode())
+               instanceId = respCreateInstance.InstanceId
+       })
+
+       t.Run("the check instance should exist", func(t *testing.T) {
+               respCheckInstance, err := 
datasource.Instance().ExistInstanceByID(getContext(), 
&pb.MicroServiceInstanceKey{
+                       ServiceId:  serviceId,
+                       InstanceId: instanceId,
+               })
+               assert.NoError(t, err)
+               assert.Equal(t, pb.ResponseSuccess, 
respCheckInstance.Response.GetCode())
+       })
+
+       t.Run("unregister instance", func(t *testing.T) {
+               resp, err := 
datasource.Instance().UnregisterInstance(getContext(), 
&pb.UnregisterInstanceRequest{
+                       ServiceId:  serviceId,
+                       InstanceId: instanceId,
+               })
+               assert.NoError(t, err)
+               assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+       })
+
+       t.Run("The check instance should not exist", func(t *testing.T) {
+               resp, err := 
datasource.Instance().ExistInstanceByID(getContext(), 
&pb.MicroServiceInstanceKey{
+                       ServiceId:  serviceId,
+                       InstanceId: instanceId,
+               })
+               assert.NotNil(t, err)
+               assert.Equal(t, pb.ErrInstanceNotExists, 
resp.Response.GetCode())
+       })
+
+       t.Run("unregister service", func(t *testing.T) {
+               resp, err := 
datasource.Instance().UnregisterService(getContext(), &pb.DeleteServiceRequest{
+                       ServiceId: serviceId,
+               })
+               assert.NoError(t, err)
+               assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+       })
+}
diff --git a/datasource/mongo/client/dao/instance.go 
b/datasource/mongo/client/dao/instance.go
index 47a7efa..3b533eb 100644
--- a/datasource/mongo/client/dao/instance.go
+++ b/datasource/mongo/client/dao/instance.go
@@ -102,3 +102,8 @@ func UpdateInstance(ctx context.Context, filter 
interface{}, update interface{},
        }
        return nil
 }
+
+func ExistInstance(ctx context.Context, serviceID string, instanceID string) 
(bool, error) {
+       filter := mutil.NewBasicFilter(ctx, mutil.InstanceServiceID(serviceID), 
mutil.InstanceInstanceID(instanceID))
+       return client.GetMongoClient().DocExist(ctx, model.CollectionInstance, 
filter)
+}
diff --git a/datasource/mongo/heartbeat/cache/heartbeat.go 
b/datasource/mongo/heartbeat/cache/heartbeat.go
index 1c99bb6..2085f12 100644
--- a/datasource/mongo/heartbeat/cache/heartbeat.go
+++ b/datasource/mongo/heartbeat/cache/heartbeat.go
@@ -62,12 +62,12 @@ type cacheConfig struct {
 func configuration() *cacheConfig {
        once.Do(func() {
                cfg.workerNum = runtime.NumCPU()
-               num := config.GetInt("registry.mongo.heartbeat.workerNum", 
defaultWorkNum)
+               num := config.GetInt("heartbeat.workerNum", defaultWorkNum)
                if num != 0 {
                        cfg.workerNum = num
                }
-               cfg.heartbeatTaskTimeout = 
config.GetInt("registry.mongo.heartbeat.timeout", defaultTimeout)
-               cfg.cacheChan = make(chan *instanceHeartbeatInfo, 
config.GetInt("registry.mongo.heartbeat.cacheCapacity", defaultCacheCapacity))
+               cfg.heartbeatTaskTimeout = config.GetInt("heartbeat.timeout", 
defaultTimeout)
+               cfg.cacheChan = make(chan *instanceHeartbeatInfo, 
config.GetInt("heartbeat.cacheCapacity", defaultCacheCapacity))
                cfg.instanceHeartbeatStore = cache.New(0, 
instanceCheckerInternal)
                cfg.instanceHeartbeatStore.OnEvicted(func(k string, v 
interface{}) {
                        instanceInfo, ok := v.(*instanceHeartbeatInfo)
diff --git a/datasource/mongo/mongo.go b/datasource/mongo/mongo.go
index 8d882c7..5136485 100644
--- a/datasource/mongo/mongo.go
+++ b/datasource/mongo/mongo.go
@@ -86,7 +86,7 @@ func (ds *DataSource) initialize() error {
 }
 
 func (ds *DataSource) initPlugins() error {
-       kind := config.GetString("registry.mongo.heartbeat.kind", "cache")
+       kind := config.GetString("heartbeat.kind", "cache")
        err := heartbeat.Init(heartbeat.Options{PluginImplName: 
heartbeat.ImplName(kind)})
        if err != nil {
                log.Fatal("heartbeat init failed", err)
diff --git a/datasource/mongo/ms.go b/datasource/mongo/ms.go
index f300828..32beecf 100644
--- a/datasource/mongo/ms.go
+++ b/datasource/mongo/ms.go
@@ -1530,6 +1530,21 @@ func preProcessRegister(ctx context.Context, instance 
*discovery.MicroServiceIns
        }, true, nil
 }
 
+func (ds *DataSource) ExistInstanceByID(ctx context.Context, request 
*discovery.MicroServiceInstanceKey) (*discovery.GetExistenceByIDResponse, 
error) {
+       exist, _ := dao.ExistInstance(ctx, request.ServiceId, 
request.InstanceId)
+       if !exist {
+               return &discovery.GetExistenceByIDResponse{
+                       Response: 
discovery.CreateResponse(discovery.ErrInstanceNotExists, "Check instance exist 
failed."),
+                       Exist:    false,
+               }, datasource.ErrInstanceNotExists
+       }
+
+       return &discovery.GetExistenceByIDResponse{
+               Response: discovery.CreateResponse(discovery.ResponseSuccess, 
"Check service exists successfully."),
+               Exist:    exist,
+       }, nil
+}
+
 // GetInstance returns instance under the current domain
 func (ds *DataSource) GetInstance(ctx context.Context, request 
*discovery.GetOneInstanceRequest) (*discovery.GetOneInstanceResponse, error) {
        var service *model.Service
diff --git a/datasource/ms.go b/datasource/ms.go
index 5c63200..7e63ee3 100644
--- a/datasource/ms.go
+++ b/datasource/ms.go
@@ -25,6 +25,7 @@ import (
 )
 
 var ErrServiceNotExists = errors.New("service does not exist")
+var ErrInstanceNotExists = errors.New("instance does not exist")
 
 // Attention: request validation must be finished before the following 
interface being invoked!!!
 // MetadataManager contains the CRUD of cache metadata
@@ -50,6 +51,7 @@ type MetadataManager interface {
 
        // Instance management
        RegisterInstance(ctx context.Context, request 
*pb.RegisterInstanceRequest) (*pb.RegisterInstanceResponse, error)
+       ExistInstanceByID(ctx context.Context, request 
*pb.MicroServiceInstanceKey) (*pb.GetExistenceByIDResponse, error)
        // GetInstances returns instances under the specified service
        GetInstance(ctx context.Context, request *pb.GetOneInstanceRequest) 
(*pb.GetOneInstanceResponse, error)
        GetInstances(ctx context.Context, request *pb.GetInstancesRequest) 
(*pb.GetInstancesResponse, error)
diff --git a/docs/user-guides.rst b/docs/user-guides.rst
index dd18ba4..7bba95d 100644
--- a/docs/user-guides.rst
+++ b/docs/user-guides.rst
@@ -8,6 +8,7 @@ User Guides
    user-guides/pr-raising-guide.md
    user-guides/security-tls.md
    user-guides/data-source.rst
+   user-guides/heartbeat.rst
    user-guides/sc-cluster.rst
    user-guides/integration-grafana.rst
    user-guides/rbac.md
diff --git a/docs/user-guides/data-source.rst b/docs/user-guides/data-source.rst
index cf1f340..c7f9da5 100644
--- a/docs/user-guides/data-source.rst
+++ b/docs/user-guides/data-source.rst
@@ -121,17 +121,6 @@ Configure app.yaml according to your needs.
 ::
 
      mongo:
-       heartbeat:
-         # Mongo's heartbeat plugin
-         # heartbeat.kind="checker or cache"
-         # if heartbeat.kind equals to 'cache', should set 
cacheCapacity,workerNum and taskTimeout
-         # capacity = 10000
-         # workerNum = 10
-         # timeout = 10
-         kind: cache
-         cacheCapacity: 10000
-         workerNum: 10
-         timeout: 10
        cluster:
          uri: mongodb://localhost:27017
          sslEnabled: false
@@ -148,22 +137,6 @@ Configure app.yaml according to your needs.
     - description
     - required
     - value
-  * - registry.mongo.heartbeat.kind
-    - there are two types of heartbeat plug-ins. With cache and without cache.
-    - yes
-    - cache/checker
-  * - registry.mongo.heartbeat.cacheCapacity
-    - cache capacity
-    - yes
-    - a integer, like 10000
-  * - registry.mongo.heartbeat.workerNum
-    - the number of working cooperations
-    - yes
-    - a integer, like 10
-  * - registry.mongo.heartbeat.timeout
-    - processing task timeout (default unit: s)
-    - yes
-    - a integer, like 10
   * - registry.mongo.cluster.uri
     - mongodb server address
     - yes
diff --git a/docs/user-guides/heartbeat.rst b/docs/user-guides/heartbeat.rst
new file mode 100644
index 0000000..43e3e71
--- /dev/null
+++ b/docs/user-guides/heartbeat.rst
@@ -0,0 +1,48 @@
+Heartbeat
+========================
+Heartbeat configuration. Configure app.yaml according to your needs.
+
+::
+
+   heartbeat:
+     # configuration of websocket long connection
+     websocket:
+       pingInterval: 30s
+     # heartbeat.kind="checker or cache"
+     # if heartbeat.kind equals to 'cache', should set cacheCapacity,workerNum 
and taskTimeout
+     # capacity = 10000
+     # workerNum = 10
+     # timeout = 10
+     kind: cache
+     cacheCapacity: 10000
+     workerNum: 10
+     timeout: 10
+
+.. list-table::
+  :widths: 15 20 5 10
+  :header-rows: 1
+
+  * - field
+    - description
+    - required
+    - value
+  * - heartbeat.websocket.pingInterval
+    - websocket ping interval.
+    - yes
+    - like 30s
+  * - heartbeat.kind
+    - there are two types of heartbeat plug-ins. With cache and without cache.
+    - yes
+    - cache/checker
+  * - heartbeat.cacheCapacity
+    - cache capacity
+    - yes
+    - a integer, like 10000
+  * - heartbeat.workerNum
+    - the number of working cooperations
+    - yes
+    - a integer, like 10
+  * - heartbeat.timeout
+    - processing task timeout (default unit: s)
+    - yes
+    - a integer, like 10
\ No newline at end of file
diff --git a/etc/conf/app.yaml b/etc/conf/app.yaml
index ea3cf24..1c05844 100644
--- a/etc/conf/app.yaml
+++ b/etc/conf/app.yaml
@@ -114,17 +114,6 @@ registry:
     request:
       timeout: 30s
   mongo:
-    heartbeat:
-      # Mongo's heartbeat plugin
-      # heartbeat.kind="checker or cache"
-      # if heartbeat.kind equals to 'cache', should set 
cacheCapacity,workerNum and taskTimeout
-      # capacity = 10000
-      # workerNum = 10
-      # timeout = 10
-      kind: cache
-      cacheCapacity: 10000
-      workerNum: 10
-      timeout: 10
     cluster:
       uri: mongodb://127.0.0.1:27017
       sslEnabled: false
@@ -209,3 +198,18 @@ auditlog:
 
 syncer:
   enabled: false
+
+heartbeat:
+  # configuration of websocket long connection
+  websocket:
+    pingInterval: 30s
+  # heartbeat.kind="checker or cache"
+  # if heartbeat.kind equals to 'cache', should set cacheCapacity,workerNum 
and taskTimeout
+  # capacity = 10000
+  # workerNum = 10
+  # timeout = 10
+  kind: cache
+  cacheCapacity: 10000
+  workerNum: 10
+  timeout: 10
+
diff --git a/pkg/proto/service_ex.go b/pkg/proto/service_ex.go
index 64c0b9e..1d49227 100644
--- a/pkg/proto/service_ex.go
+++ b/pkg/proto/service_ex.go
@@ -31,5 +31,7 @@ type ServiceInstanceCtrlServerEx interface {
 
        WebSocketWatch(ctx context.Context, in *discovery.WatchInstanceRequest, 
conn *websocket.Conn)
 
+       WatchHeartbeat(ctx context.Context, in *discovery.HeartbeatRequest, 
conn *websocket.Conn)
+
        ClusterHealth(ctx context.Context) (*discovery.GetInstancesResponse, 
error)
 }
diff --git a/pkg/rbacframe/api.go b/pkg/rbacframe/api.go
index 8dc46b7..57b8846 100644
--- a/pkg/rbacframe/api.go
+++ b/pkg/rbacframe/api.go
@@ -20,11 +20,12 @@ package rbacframe
 
 import (
        "crypto/rsa"
+
        "github.com/go-chassis/cari/rbac"
+       "k8s.io/apimachinery/pkg/util/sets"
 
        "github.com/apache/servicecomb-service-center/pkg/util"
        "github.com/go-chassis/go-chassis/v2/security/token"
-       "k8s.io/apimachinery/pkg/util/sets"
 )
 
 const (
diff --git a/server/alarm/common.go b/server/alarm/common.go
index cb9df7a..2989ec9 100644
--- a/server/alarm/common.go
+++ b/server/alarm/common.go
@@ -17,6 +17,7 @@ package alarm
 
 import (
        "fmt"
+
        "github.com/apache/servicecomb-service-center/pkg/event"
        "github.com/apache/servicecomb-service-center/server/alarm/model"
 )
diff --git a/server/connection/hbws/websocket.go 
b/server/connection/hbws/websocket.go
new file mode 100644
index 0000000..314f426
--- /dev/null
+++ b/server/connection/hbws/websocket.go
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package hbws
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       pb "github.com/go-chassis/cari/discovery"
+       "github.com/gorilla/websocket"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/apache/servicecomb-service-center/server/config"
+       "github.com/apache/servicecomb-service-center/server/connection"
+       "github.com/apache/servicecomb-service-center/server/core"
+       "github.com/apache/servicecomb-service-center/server/metrics"
+)
+
+const (
+       Websocket         = "Websocket"
+       defaultPingPeriod = 30 * time.Second
+       minPeriod         = 1 * time.Second
+       maxPeriod         = 1 * time.Hour
+)
+
+var (
+       once       sync.Once
+       pingPeriod time.Duration
+)
+
+type client struct {
+       cxt        context.Context
+       conn       *websocket.Conn
+       serviceID  string
+       instanceID string
+}
+
+func configuration() {
+       once.Do(func() {
+               pingPeriod = 
config.GetDuration("heartbeat.websocket.pingInterval", defaultPingPeriod)
+               if pingPeriod < minPeriod || pingPeriod > maxPeriod {
+                       pingPeriod = defaultPingPeriod
+               }
+       })
+}
+
+func newClient(ctx context.Context, conn *websocket.Conn, serviceID string, 
instanceID string) *client {
+       configuration()
+       return &client{
+               cxt:        ctx,
+               conn:       conn,
+               serviceID:  serviceID,
+               instanceID: instanceID,
+       }
+}
+
+func (c *client) sendClose(code int, text string) error {
+       remoteAddr := c.conn.RemoteAddr().String()
+       var message []byte
+       if code != websocket.CloseNoStatusReceived {
+               message = websocket.FormatCloseMessage(code, text)
+       }
+       err := c.conn.WriteControl(websocket.CloseMessage, message, 
time.Now().Add(connection.SendTimeout))
+       if err != nil {
+               log.Error(fmt.Sprintf("watcher[%s] catch an err", remoteAddr), 
err)
+               return err
+       }
+       return nil
+}
+
+func (c *client) heartbeat() {
+       remoteAddr := c.conn.RemoteAddr().String()
+       ticker := time.NewTicker(pingPeriod)
+       defer func() {
+               ticker.Stop()
+               c.conn.Close()
+       }()
+       for {
+               <-ticker.C
+               err := 
c.conn.SetWriteDeadline(time.Now().Add(connection.SendTimeout))
+               if err != nil {
+                       log.Error("", err)
+               }
+               if err := c.conn.WriteMessage(websocket.PingMessage, nil); err 
!= nil {
+                       log.Error(fmt.Sprintf("send 'Ping' message to 
watcher[%s] failed", remoteAddr), err)
+                       return
+               }
+       }
+}
+
+func (c *client) handleMessage() {
+       defer func() {
+               c.conn.Close()
+       }()
+
+       remoteAddr := c.conn.RemoteAddr().String()
+       c.conn.SetPongHandler(func(message string) error {
+               err := 
c.conn.SetReadDeadline(time.Now().Add(connection.ReadTimeout))
+               if err != nil {
+                       log.Error("", err)
+               }
+               log.Infof("received 'Pong' message '%s' from watcher[%s]\n", 
message, remoteAddr)
+               request := &pb.HeartbeatRequest{
+                       ServiceId:  c.serviceID,
+                       InstanceId: c.instanceID,
+               }
+               _, err = core.InstanceAPI.Heartbeat(c.cxt, request)
+               if err != nil {
+                       log.Error("instance heartbeat report failed ", err)
+               }
+               return err
+       })
+
+       c.conn.SetCloseHandler(func(code int, text string) error {
+               log.Info(fmt.Sprintf("watcher[%s] active closed, code: %d, 
message: '%s'", remoteAddr, code, text))
+               return c.sendClose(code, text)
+       })
+       for {
+               _, _, err := c.conn.ReadMessage()
+               if err != nil {
+                       if websocket.IsUnexpectedCloseError(err, 
websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
+                               log.Error("", err)
+                       }
+                       break
+               }
+       }
+}
+
+func SendEstablishError(conn *websocket.Conn, err error) {
+       remoteAddr := conn.RemoteAddr().String()
+       log.Errorf(err, "establish[%s] websocket failed.", remoteAddr)
+       if err := conn.WriteMessage(websocket.TextMessage, 
util.StringToBytesWithNoCopy(err.Error())); err != nil {
+               log.Errorf(err, "establish[%s] websocket failed: write message 
failed.", remoteAddr)
+       }
+}
+
+func Heartbeat(ctx context.Context, conn *websocket.Conn, serviceID string, 
instanceID string) {
+       domain := util.ParseDomain(ctx)
+       client := newClient(ctx, conn, serviceID, instanceID)
+       metrics.ReportSubscriber(domain, Websocket, 1)
+       process(client)
+       metrics.ReportSubscriber(domain, Websocket, -1)
+}
+
+func process(client *client) {
+       go client.heartbeat()
+       client.handleMessage()
+}
diff --git a/server/connection/hbws/websocket_test.go 
b/server/connection/hbws/websocket_test.go
new file mode 100644
index 0000000..729e780
--- /dev/null
+++ b/server/connection/hbws/websocket_test.go
@@ -0,0 +1,67 @@
+package hbws_test
+
+import (
+       "context"
+       "net/http"
+       "net/http/httptest"
+       "strings"
+       "testing"
+       "time"
+
+       "github.com/gorilla/websocket"
+       "github.com/stretchr/testify/assert"
+
+       "github.com/apache/servicecomb-service-center/server/connection/hbws"
+       "github.com/apache/servicecomb-service-center/server/core"
+)
+
+var closeCh = make(chan struct{})
+
+func init() {
+       testing.Init()
+       core.Initialize()
+}
+
+type watcherConn struct {
+       clientConn *websocket.Conn
+       serverConn *websocket.Conn
+}
+
+func (h *watcherConn) Test() {
+       s := httptest.NewServer(h)
+       h.clientConn, _, _ = websocket.DefaultDialer.Dial(
+               strings.Replace(s.URL, "http://";, "ws://", 1), nil)
+}
+
+func (h *watcherConn) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+       var upgrader = websocket.Upgrader{}
+       h.serverConn, _ = upgrader.Upgrade(w, r, nil)
+       for {
+               //h.ServerConn.WriteControl(websocket.PingMessage, []byte{}, 
time.Now().Add(time.Second))
+               //h.ServerConn.WriteControl(websocket.PongMessage, []byte{}, 
time.Now().Add(time.Second))
+               _, _, err := h.serverConn.ReadMessage()
+               if err != nil {
+                       return
+               }
+               <-closeCh
+               h.serverConn.WriteControl(websocket.CloseMessage, []byte{}, 
time.Now().Add(time.Second))
+               h.serverConn.Close()
+               return
+       }
+}
+
+func NewTest() *watcherConn {
+       ts := &watcherConn{}
+       ts.Test()
+       return ts
+}
+
+func TestHeartbeat(t *testing.T) {
+       mock := NewTest()
+       go hbws.Heartbeat(context.Background(), mock.serverConn, "", "")
+       err := mock.serverConn.WriteMessage(websocket.TextMessage, 
[]byte("hello"))
+       assert.Nil(t, err)
+       _, p, err := mock.clientConn.ReadMessage()
+       assert.Nil(t, err)
+       assert.Equal(t, "hello", string(p))
+}
diff --git a/server/connection/ws/websocket.go 
b/server/connection/ws/websocket.go
index e9fe3c1..0e9a7e0 100644
--- a/server/connection/ws/websocket.go
+++ b/server/connection/ws/websocket.go
@@ -20,9 +20,10 @@ package ws
 import (
        "context"
        "fmt"
-       "github.com/apache/servicecomb-service-center/pkg/util"
        "time"
 
+       "github.com/apache/servicecomb-service-center/pkg/util"
+
        "github.com/apache/servicecomb-service-center/datasource"
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/server/connection"
@@ -32,7 +33,7 @@ import (
 
 const Websocket = "Websocket"
 
-var errServiceNotExist = fmt.Errorf("Service does not exist.")
+var errServiceNotExist = fmt.Errorf("service does not exist")
 
 type WebSocket struct {
        Options
diff --git a/server/connection/ws/websocket_test.go 
b/server/connection/ws/websocket_test.go
index ca40bd6..7518b5a 100644
--- a/server/connection/ws/websocket_test.go
+++ b/server/connection/ws/websocket_test.go
@@ -159,6 +159,6 @@ func TestWebSocket_CheckHealth(t *testing.T) {
        })
        t.Run("should return err when consumer not exist", func(t *testing.T) {
                ws := wss.NewWebSocket("", "", mock.ServerConn)
-               assert.Equal(t, "Service does not exist.", 
ws.CheckHealth(context.Background()).Error())
+               assert.Equal(t, "service does not exist", 
ws.CheckHealth(context.Background()).Error())
        })
 }
diff --git a/server/event/instance_subscriber.go 
b/server/event/instance_subscriber.go
index 9d5c1c4..fff474b 100644
--- a/server/event/instance_subscriber.go
+++ b/server/event/instance_subscriber.go
@@ -19,6 +19,7 @@ package event
 
 import (
        "fmt"
+
        "github.com/apache/servicecomb-service-center/pkg/event"
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/server/metrics"
diff --git a/server/metrics/connection.go b/server/metrics/connection.go
index c012980..18366dd 100644
--- a/server/metrics/connection.go
+++ b/server/metrics/connection.go
@@ -18,9 +18,10 @@
 package metrics
 
 import (
-       "github.com/apache/servicecomb-service-center/pkg/event"
        "time"
 
+       "github.com/apache/servicecomb-service-center/pkg/event"
+
        "github.com/apache/servicecomb-service-center/pkg/metrics"
        helper "github.com/apache/servicecomb-service-center/pkg/prometheus"
        "github.com/prometheus/client_golang/prometheus"
diff --git a/server/rest/controller/v4/instance_watcher.go 
b/server/rest/controller/v4/instance_watcher.go
index 9396ee5..c8bf7f1 100644
--- a/server/rest/controller/v4/instance_watcher.go
+++ b/server/rest/controller/v4/instance_watcher.go
@@ -34,6 +34,7 @@ type WatchService struct {
 func (s *WatchService) URLPatterns() []rest.Route {
        return []rest.Route{
                {Method: rest.HTTPMethodGet, Path: 
"/v4/:project/registry/microservices/:serviceId/watcher", Func: s.Watch},
+               {Method: rest.HTTPMethodGet, Path: 
"/v4/:project/registry/microservices/:serviceId/instances/:instanceId/heartbeat",
 Func: s.Heartbeat},
        }
 }
 
@@ -62,3 +63,16 @@ func (s *WatchService) Watch(w http.ResponseWriter, r 
*http.Request) {
                SelfServiceId: r.URL.Query().Get(":serviceId"),
        }, conn)
 }
+
+func (s *WatchService) Heartbeat(w http.ResponseWriter, r *http.Request) {
+       conn, err := upgrade(w, r)
+       if err != nil {
+               log.Error("failed to establish connection", err)
+               return
+       }
+       defer conn.Close()
+       core.InstanceAPI.WatchHeartbeat(r.Context(), &pb.HeartbeatRequest{
+               ServiceId:  r.URL.Query().Get(":serviceId"),
+               InstanceId: r.URL.Query().Get(":instanceId"),
+       }, conn)
+}
diff --git a/server/server.go b/server/server.go
index 8b6f099..9579c28 100644
--- a/server/server.go
+++ b/server/server.go
@@ -19,12 +19,13 @@ package server
 
 import (
        "context"
-       "github.com/apache/servicecomb-service-center/server/event"
        "net"
        "os"
        "strings"
        "time"
 
+       "github.com/apache/servicecomb-service-center/server/event"
+
        "github.com/apache/servicecomb-service-center/datasource"
        nf "github.com/apache/servicecomb-service-center/pkg/event"
        "github.com/apache/servicecomb-service-center/pkg/gopool"
diff --git a/server/service/watch.go b/server/service/watch.go
index 8813b0f..998c35a 100644
--- a/server/service/watch.go
+++ b/server/service/watch.go
@@ -22,13 +22,15 @@ import (
        "errors"
        "fmt"
 
+       pb "github.com/go-chassis/cari/discovery"
+       "github.com/gorilla/websocket"
+
        "github.com/apache/servicecomb-service-center/datasource"
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/pkg/proto"
        "github.com/apache/servicecomb-service-center/server/connection/grpc"
+       "github.com/apache/servicecomb-service-center/server/connection/hbws"
        "github.com/apache/servicecomb-service-center/server/connection/ws"
-       pb "github.com/go-chassis/cari/discovery"
-       "github.com/gorilla/websocket"
 )
 
 func (s *InstanceService) WatchPreOpera(ctx context.Context, in 
*pb.WatchInstanceRequest) error {
@@ -39,6 +41,7 @@ func (s *InstanceService) WatchPreOpera(ctx context.Context, 
in *pb.WatchInstanc
                ServiceId: in.SelfServiceId,
        })
        if err != nil {
+               log.Error("", err)
                return err
        }
        if !resp.Exist {
@@ -47,6 +50,23 @@ func (s *InstanceService) WatchPreOpera(ctx context.Context, 
in *pb.WatchInstanc
        return nil
 }
 
+func (s *InstanceService) HeartBeatPreOpera(ctx context.Context, in 
*pb.HeartbeatRequest) error {
+       if in == nil || len(in.ServiceId) == 0 || len(in.InstanceId) == 0 {
+               return errors.New("request format invalid")
+       }
+       resp, err := datasource.Instance().ExistInstanceByID(ctx, 
&pb.MicroServiceInstanceKey{
+               ServiceId:  in.ServiceId,
+               InstanceId: in.InstanceId,
+       })
+       if err != nil {
+               return err
+       }
+       if !resp.Exist {
+               return datasource.ErrInstanceNotExists
+       }
+       return nil
+}
+
 func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, stream 
proto.ServiceInstanceCtrlWatchServer) error {
        log.Infof("new a stream list and watch with service[%s]", 
in.SelfServiceId)
        if err := s.WatchPreOpera(stream.Context(), in); err != nil {
@@ -66,6 +86,15 @@ func (s *InstanceService) WebSocketWatch(ctx 
context.Context, in *pb.WatchInstan
        ws.Watch(ctx, in.SelfServiceId, conn)
 }
 
+func (s *InstanceService) WatchHeartbeat(ctx context.Context, in 
*pb.HeartbeatRequest, conn *websocket.Conn) {
+       log.Info(fmt.Sprintf("new a web socket with service[%s] ,instance[%s]", 
in.ServiceId, in.InstanceId))
+       if err := s.HeartBeatPreOpera(ctx, in); err != nil {
+               hbws.SendEstablishError(conn, err)
+               return
+       }
+       hbws.Heartbeat(ctx, conn, in.ServiceId, in.InstanceId)
+}
+
 func (s *InstanceService) QueryAllProvidersInstances(ctx context.Context, in 
*pb.WatchInstanceRequest) ([]*pb.WatchInstanceResponse, int64) {
        depResp, err := datasource.Instance().SearchConsumerDependency(ctx, 
&pb.GetDependenciesRequest{
                ServiceId: in.SelfServiceId,

Reply via email to