This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 2f646af SCB-2094 Add heartbeat ws interface (#962)
2f646af is described below
commit 2f646aff33167799d25ebf4e1bbc5322b9a579f9
Author: robotljw <[email protected]>
AuthorDate: Fri May 21 14:05:37 2021 +0800
SCB-2094 Add heartbeat ws interface (#962)
---
datasource/etcd/ms.go | 15 +++
datasource/etcd/util/instance_util.go | 13 ++
datasource/instance_test.go | 74 ++++++++++++
datasource/mongo/client/dao/instance.go | 5 +
datasource/mongo/heartbeat/cache/heartbeat.go | 6 +-
datasource/mongo/mongo.go | 2 +-
datasource/mongo/ms.go | 15 +++
datasource/ms.go | 2 +
docs/user-guides.rst | 1 +
docs/user-guides/data-source.rst | 27 -----
docs/user-guides/heartbeat.rst | 48 ++++++++
etc/conf/app.yaml | 26 ++--
pkg/proto/service_ex.go | 2 +
pkg/rbacframe/api.go | 3 +-
server/alarm/common.go | 1 +
server/connection/hbws/websocket.go | 166 ++++++++++++++++++++++++++
server/connection/hbws/websocket_test.go | 67 +++++++++++
server/connection/ws/websocket.go | 5 +-
server/connection/ws/websocket_test.go | 2 +-
server/event/instance_subscriber.go | 1 +
server/metrics/connection.go | 3 +-
server/rest/controller/v4/instance_watcher.go | 14 +++
server/server.go | 3 +-
server/service/watch.go | 33 ++++-
24 files changed, 484 insertions(+), 50 deletions(-)
diff --git a/datasource/etcd/ms.go b/datasource/etcd/ms.go
index 8455400..ea73322 100644
--- a/datasource/etcd/ms.go
+++ b/datasource/etcd/ms.go
@@ -659,6 +659,21 @@ func (ds *DataSource) RegisterInstance(ctx
context.Context, request *pb.Register
}, nil
}
+func (ds *DataSource) ExistInstanceByID(ctx context.Context, request
*pb.MicroServiceInstanceKey) (*pb.GetExistenceByIDResponse, error) {
+ domainProject := util.ParseDomainProject(ctx)
+ exist, _ := serviceUtil.InstanceExist(ctx, domainProject,
request.ServiceId, request.InstanceId)
+ if !exist {
+ return &pb.GetExistenceByIDResponse{
+ Response: pb.CreateResponse(pb.ErrInstanceNotExists,
"Check instance exist failed."),
+ Exist: false,
+ }, datasource.ErrInstanceNotExists
+ }
+ return &pb.GetExistenceByIDResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess, "Check service
exists successfully."),
+ Exist: exist,
+ }, nil
+}
+
func (ds *DataSource) GetInstance(ctx context.Context, request
*pb.GetOneInstanceRequest) (
*pb.GetOneInstanceResponse, error) {
domainProject := util.ParseDomainProject(ctx)
diff --git a/datasource/etcd/util/instance_util.go
b/datasource/etcd/util/instance_util.go
index 9a53e48..66371a0 100644
--- a/datasource/etcd/util/instance_util.go
+++ b/datasource/etcd/util/instance_util.go
@@ -65,6 +65,19 @@ func GetInstance(ctx context.Context, domainProject string,
serviceID string, in
return resp.Kvs[0].Value.(*pb.MicroServiceInstance), nil
}
+func InstanceExist(ctx context.Context, domainProject string, serviceID
string, instanceID string) (bool, error) {
+ key := path.GenerateInstanceKey(domainProject, serviceID, instanceID)
+ opts := append(FromContext(ctx), client.WithStrKey(key))
+ resp, err := kv.Store().Instance().Search(ctx, opts...)
+ if err != nil {
+ return false, err
+ }
+ if resp.Count == 0 {
+ return false, nil
+ }
+ return true, nil
+}
+
func FormatRevision(revs, counts []int64) (s string) {
for i, rev := range revs {
s += fmt.Sprintf("%d.%d,", rev, counts[i])
diff --git a/datasource/instance_test.go b/datasource/instance_test.go
index d9ce98b..261d714 100644
--- a/datasource/instance_test.go
+++ b/datasource/instance_test.go
@@ -1391,3 +1391,77 @@ func TestInstance_Unregister(t *testing.T) {
assert.NotEqual(t, pb.ResponseSuccess, resp.Response.GetCode())
})
}
+
+func TestInstance_Exist(t *testing.T) {
+ var (
+ serviceId string
+ instanceId string
+ )
+
+ t.Run("register service and instance", func(t *testing.T) {
+ respCreateService, err :=
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ AppId: "check_exist_instance_ms",
+ ServiceName: "check_exist_instance_service_ms",
+ Version: "1.0.5",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ Tags: map[string]string{
+ "test": "test",
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateService.Response.GetCode())
+ serviceId = respCreateService.ServiceId
+
+ respCreateInstance, err :=
datasource.Instance().RegisterInstance(getContext(),
&pb.RegisterInstanceRequest{
+ Instance: &pb.MicroServiceInstance{
+ ServiceId: serviceId,
+ HostName: "UT-HOST-MS",
+ Endpoints: []string{
+ "checkExist:127.0.0.2:8080",
+ },
+ Status: pb.MSI_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateInstance.Response.GetCode())
+ instanceId = respCreateInstance.InstanceId
+ })
+
+ t.Run("the check instance should exist", func(t *testing.T) {
+ respCheckInstance, err :=
datasource.Instance().ExistInstanceByID(getContext(),
&pb.MicroServiceInstanceKey{
+ ServiceId: serviceId,
+ InstanceId: instanceId,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCheckInstance.Response.GetCode())
+ })
+
+ t.Run("unregister instance", func(t *testing.T) {
+ resp, err :=
datasource.Instance().UnregisterInstance(getContext(),
&pb.UnregisterInstanceRequest{
+ ServiceId: serviceId,
+ InstanceId: instanceId,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+ })
+
+ t.Run("The check instance should not exist", func(t *testing.T) {
+ resp, err :=
datasource.Instance().ExistInstanceByID(getContext(),
&pb.MicroServiceInstanceKey{
+ ServiceId: serviceId,
+ InstanceId: instanceId,
+ })
+ assert.NotNil(t, err)
+ assert.Equal(t, pb.ErrInstanceNotExists,
resp.Response.GetCode())
+ })
+
+ t.Run("unregister service", func(t *testing.T) {
+ resp, err :=
datasource.Instance().UnregisterService(getContext(), &pb.DeleteServiceRequest{
+ ServiceId: serviceId,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+ })
+}
diff --git a/datasource/mongo/client/dao/instance.go
b/datasource/mongo/client/dao/instance.go
index 47a7efa..3b533eb 100644
--- a/datasource/mongo/client/dao/instance.go
+++ b/datasource/mongo/client/dao/instance.go
@@ -102,3 +102,8 @@ func UpdateInstance(ctx context.Context, filter
interface{}, update interface{},
}
return nil
}
+
+func ExistInstance(ctx context.Context, serviceID string, instanceID string)
(bool, error) {
+ filter := mutil.NewBasicFilter(ctx, mutil.InstanceServiceID(serviceID),
mutil.InstanceInstanceID(instanceID))
+ return client.GetMongoClient().DocExist(ctx, model.CollectionInstance,
filter)
+}
diff --git a/datasource/mongo/heartbeat/cache/heartbeat.go
b/datasource/mongo/heartbeat/cache/heartbeat.go
index 1c99bb6..2085f12 100644
--- a/datasource/mongo/heartbeat/cache/heartbeat.go
+++ b/datasource/mongo/heartbeat/cache/heartbeat.go
@@ -62,12 +62,12 @@ type cacheConfig struct {
func configuration() *cacheConfig {
once.Do(func() {
cfg.workerNum = runtime.NumCPU()
- num := config.GetInt("registry.mongo.heartbeat.workerNum",
defaultWorkNum)
+ num := config.GetInt("heartbeat.workerNum", defaultWorkNum)
if num != 0 {
cfg.workerNum = num
}
- cfg.heartbeatTaskTimeout =
config.GetInt("registry.mongo.heartbeat.timeout", defaultTimeout)
- cfg.cacheChan = make(chan *instanceHeartbeatInfo,
config.GetInt("registry.mongo.heartbeat.cacheCapacity", defaultCacheCapacity))
+ cfg.heartbeatTaskTimeout = config.GetInt("heartbeat.timeout",
defaultTimeout)
+ cfg.cacheChan = make(chan *instanceHeartbeatInfo,
config.GetInt("heartbeat.cacheCapacity", defaultCacheCapacity))
cfg.instanceHeartbeatStore = cache.New(0,
instanceCheckerInternal)
cfg.instanceHeartbeatStore.OnEvicted(func(k string, v
interface{}) {
instanceInfo, ok := v.(*instanceHeartbeatInfo)
diff --git a/datasource/mongo/mongo.go b/datasource/mongo/mongo.go
index 8d882c7..5136485 100644
--- a/datasource/mongo/mongo.go
+++ b/datasource/mongo/mongo.go
@@ -86,7 +86,7 @@ func (ds *DataSource) initialize() error {
}
func (ds *DataSource) initPlugins() error {
- kind := config.GetString("registry.mongo.heartbeat.kind", "cache")
+ kind := config.GetString("heartbeat.kind", "cache")
err := heartbeat.Init(heartbeat.Options{PluginImplName:
heartbeat.ImplName(kind)})
if err != nil {
log.Fatal("heartbeat init failed", err)
diff --git a/datasource/mongo/ms.go b/datasource/mongo/ms.go
index f300828..32beecf 100644
--- a/datasource/mongo/ms.go
+++ b/datasource/mongo/ms.go
@@ -1530,6 +1530,21 @@ func preProcessRegister(ctx context.Context, instance
*discovery.MicroServiceIns
}, true, nil
}
+func (ds *DataSource) ExistInstanceByID(ctx context.Context, request
*discovery.MicroServiceInstanceKey) (*discovery.GetExistenceByIDResponse,
error) {
+ exist, _ := dao.ExistInstance(ctx, request.ServiceId,
request.InstanceId)
+ if !exist {
+ return &discovery.GetExistenceByIDResponse{
+ Response:
discovery.CreateResponse(discovery.ErrInstanceNotExists, "Check instance exist
failed."),
+ Exist: false,
+ }, datasource.ErrInstanceNotExists
+ }
+
+ return &discovery.GetExistenceByIDResponse{
+ Response: discovery.CreateResponse(discovery.ResponseSuccess,
"Check service exists successfully."),
+ Exist: exist,
+ }, nil
+}
+
// GetInstance returns instance under the current domain
func (ds *DataSource) GetInstance(ctx context.Context, request
*discovery.GetOneInstanceRequest) (*discovery.GetOneInstanceResponse, error) {
var service *model.Service
diff --git a/datasource/ms.go b/datasource/ms.go
index 5c63200..7e63ee3 100644
--- a/datasource/ms.go
+++ b/datasource/ms.go
@@ -25,6 +25,7 @@ import (
)
var ErrServiceNotExists = errors.New("service does not exist")
+var ErrInstanceNotExists = errors.New("instance does not exist")
// Attention: request validation must be finished before the following
interface being invoked!!!
// MetadataManager contains the CRUD of cache metadata
@@ -50,6 +51,7 @@ type MetadataManager interface {
// Instance management
RegisterInstance(ctx context.Context, request
*pb.RegisterInstanceRequest) (*pb.RegisterInstanceResponse, error)
+ ExistInstanceByID(ctx context.Context, request
*pb.MicroServiceInstanceKey) (*pb.GetExistenceByIDResponse, error)
// GetInstances returns instances under the specified service
GetInstance(ctx context.Context, request *pb.GetOneInstanceRequest)
(*pb.GetOneInstanceResponse, error)
GetInstances(ctx context.Context, request *pb.GetInstancesRequest)
(*pb.GetInstancesResponse, error)
diff --git a/docs/user-guides.rst b/docs/user-guides.rst
index dd18ba4..7bba95d 100644
--- a/docs/user-guides.rst
+++ b/docs/user-guides.rst
@@ -8,6 +8,7 @@ User Guides
user-guides/pr-raising-guide.md
user-guides/security-tls.md
user-guides/data-source.rst
+ user-guides/heartbeat.rst
user-guides/sc-cluster.rst
user-guides/integration-grafana.rst
user-guides/rbac.md
diff --git a/docs/user-guides/data-source.rst b/docs/user-guides/data-source.rst
index cf1f340..c7f9da5 100644
--- a/docs/user-guides/data-source.rst
+++ b/docs/user-guides/data-source.rst
@@ -121,17 +121,6 @@ Configure app.yaml according to your needs.
::
mongo:
- heartbeat:
- # Mongo's heartbeat plugin
- # heartbeat.kind="checker or cache"
- # if heartbeat.kind equals to 'cache', should set
cacheCapacity,workerNum and taskTimeout
- # capacity = 10000
- # workerNum = 10
- # timeout = 10
- kind: cache
- cacheCapacity: 10000
- workerNum: 10
- timeout: 10
cluster:
uri: mongodb://localhost:27017
sslEnabled: false
@@ -148,22 +137,6 @@ Configure app.yaml according to your needs.
- description
- required
- value
- * - registry.mongo.heartbeat.kind
- - there are two types of heartbeat plug-ins. With cache and without cache.
- - yes
- - cache/checker
- * - registry.mongo.heartbeat.cacheCapacity
- - cache capacity
- - yes
- - a integer, like 10000
- * - registry.mongo.heartbeat.workerNum
- - the number of working cooperations
- - yes
- - a integer, like 10
- * - registry.mongo.heartbeat.timeout
- - processing task timeout (default unit: s)
- - yes
- - a integer, like 10
* - registry.mongo.cluster.uri
- mongodb server address
- yes
diff --git a/docs/user-guides/heartbeat.rst b/docs/user-guides/heartbeat.rst
new file mode 100644
index 0000000..43e3e71
--- /dev/null
+++ b/docs/user-guides/heartbeat.rst
@@ -0,0 +1,48 @@
+Heartbeat
+========================
+Heartbeat configuration. Configure app.yaml according to your needs.
+
+::
+
+ heartbeat:
+ # configuration of websocket long connection
+ websocket:
+ pingInterval: 30s
+ # heartbeat.kind="checker or cache"
+ # if heartbeat.kind equals to 'cache', should set cacheCapacity,workerNum
and taskTimeout
+ # capacity = 10000
+ # workerNum = 10
+ # timeout = 10
+ kind: cache
+ cacheCapacity: 10000
+ workerNum: 10
+ timeout: 10
+
+.. list-table::
+ :widths: 15 20 5 10
+ :header-rows: 1
+
+ * - field
+ - description
+ - required
+ - value
+ * - heartbeat.websocket.pingInterval
+ - websocket ping interval.
+ - yes
+ - like 30s
+ * - heartbeat.kind
+ - there are two types of heartbeat plug-ins. With cache and without cache.
+ - yes
+ - cache/checker
+ * - heartbeat.cacheCapacity
+ - cache capacity
+ - yes
+ - a integer, like 10000
+ * - heartbeat.workerNum
+ - the number of working cooperations
+ - yes
+ - a integer, like 10
+ * - heartbeat.timeout
+ - processing task timeout (default unit: s)
+ - yes
+ - a integer, like 10
\ No newline at end of file
diff --git a/etc/conf/app.yaml b/etc/conf/app.yaml
index ea3cf24..1c05844 100644
--- a/etc/conf/app.yaml
+++ b/etc/conf/app.yaml
@@ -114,17 +114,6 @@ registry:
request:
timeout: 30s
mongo:
- heartbeat:
- # Mongo's heartbeat plugin
- # heartbeat.kind="checker or cache"
- # if heartbeat.kind equals to 'cache', should set
cacheCapacity,workerNum and taskTimeout
- # capacity = 10000
- # workerNum = 10
- # timeout = 10
- kind: cache
- cacheCapacity: 10000
- workerNum: 10
- timeout: 10
cluster:
uri: mongodb://127.0.0.1:27017
sslEnabled: false
@@ -209,3 +198,18 @@ auditlog:
syncer:
enabled: false
+
+heartbeat:
+ # configuration of websocket long connection
+ websocket:
+ pingInterval: 30s
+ # heartbeat.kind="checker or cache"
+ # if heartbeat.kind equals to 'cache', should set cacheCapacity,workerNum
and taskTimeout
+ # capacity = 10000
+ # workerNum = 10
+ # timeout = 10
+ kind: cache
+ cacheCapacity: 10000
+ workerNum: 10
+ timeout: 10
+
diff --git a/pkg/proto/service_ex.go b/pkg/proto/service_ex.go
index 64c0b9e..1d49227 100644
--- a/pkg/proto/service_ex.go
+++ b/pkg/proto/service_ex.go
@@ -31,5 +31,7 @@ type ServiceInstanceCtrlServerEx interface {
WebSocketWatch(ctx context.Context, in *discovery.WatchInstanceRequest,
conn *websocket.Conn)
+ WatchHeartbeat(ctx context.Context, in *discovery.HeartbeatRequest,
conn *websocket.Conn)
+
ClusterHealth(ctx context.Context) (*discovery.GetInstancesResponse,
error)
}
diff --git a/pkg/rbacframe/api.go b/pkg/rbacframe/api.go
index 8dc46b7..57b8846 100644
--- a/pkg/rbacframe/api.go
+++ b/pkg/rbacframe/api.go
@@ -20,11 +20,12 @@ package rbacframe
import (
"crypto/rsa"
+
"github.com/go-chassis/cari/rbac"
+ "k8s.io/apimachinery/pkg/util/sets"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/go-chassis/go-chassis/v2/security/token"
- "k8s.io/apimachinery/pkg/util/sets"
)
const (
diff --git a/server/alarm/common.go b/server/alarm/common.go
index cb9df7a..2989ec9 100644
--- a/server/alarm/common.go
+++ b/server/alarm/common.go
@@ -17,6 +17,7 @@ package alarm
import (
"fmt"
+
"github.com/apache/servicecomb-service-center/pkg/event"
"github.com/apache/servicecomb-service-center/server/alarm/model"
)
diff --git a/server/connection/hbws/websocket.go
b/server/connection/hbws/websocket.go
new file mode 100644
index 0000000..314f426
--- /dev/null
+++ b/server/connection/hbws/websocket.go
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package hbws
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/gorilla/websocket"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/config"
+ "github.com/apache/servicecomb-service-center/server/connection"
+ "github.com/apache/servicecomb-service-center/server/core"
+ "github.com/apache/servicecomb-service-center/server/metrics"
+)
+
+const (
+ Websocket = "Websocket"
+ defaultPingPeriod = 30 * time.Second
+ minPeriod = 1 * time.Second
+ maxPeriod = 1 * time.Hour
+)
+
+var (
+ once sync.Once
+ pingPeriod time.Duration
+)
+
+type client struct {
+ cxt context.Context
+ conn *websocket.Conn
+ serviceID string
+ instanceID string
+}
+
+func configuration() {
+ once.Do(func() {
+ pingPeriod =
config.GetDuration("heartbeat.websocket.pingInterval", defaultPingPeriod)
+ if pingPeriod < minPeriod || pingPeriod > maxPeriod {
+ pingPeriod = defaultPingPeriod
+ }
+ })
+}
+
+func newClient(ctx context.Context, conn *websocket.Conn, serviceID string,
instanceID string) *client {
+ configuration()
+ return &client{
+ cxt: ctx,
+ conn: conn,
+ serviceID: serviceID,
+ instanceID: instanceID,
+ }
+}
+
+func (c *client) sendClose(code int, text string) error {
+ remoteAddr := c.conn.RemoteAddr().String()
+ var message []byte
+ if code != websocket.CloseNoStatusReceived {
+ message = websocket.FormatCloseMessage(code, text)
+ }
+ err := c.conn.WriteControl(websocket.CloseMessage, message,
time.Now().Add(connection.SendTimeout))
+ if err != nil {
+ log.Error(fmt.Sprintf("watcher[%s] catch an err", remoteAddr),
err)
+ return err
+ }
+ return nil
+}
+
+func (c *client) heartbeat() {
+ remoteAddr := c.conn.RemoteAddr().String()
+ ticker := time.NewTicker(pingPeriod)
+ defer func() {
+ ticker.Stop()
+ c.conn.Close()
+ }()
+ for {
+ <-ticker.C
+ err :=
c.conn.SetWriteDeadline(time.Now().Add(connection.SendTimeout))
+ if err != nil {
+ log.Error("", err)
+ }
+ if err := c.conn.WriteMessage(websocket.PingMessage, nil); err
!= nil {
+ log.Error(fmt.Sprintf("send 'Ping' message to
watcher[%s] failed", remoteAddr), err)
+ return
+ }
+ }
+}
+
+func (c *client) handleMessage() {
+ defer func() {
+ c.conn.Close()
+ }()
+
+ remoteAddr := c.conn.RemoteAddr().String()
+ c.conn.SetPongHandler(func(message string) error {
+ err :=
c.conn.SetReadDeadline(time.Now().Add(connection.ReadTimeout))
+ if err != nil {
+ log.Error("", err)
+ }
+ log.Infof("received 'Pong' message '%s' from watcher[%s]\n",
message, remoteAddr)
+ request := &pb.HeartbeatRequest{
+ ServiceId: c.serviceID,
+ InstanceId: c.instanceID,
+ }
+ _, err = core.InstanceAPI.Heartbeat(c.cxt, request)
+ if err != nil {
+ log.Error("instance heartbeat report failed ", err)
+ }
+ return err
+ })
+
+ c.conn.SetCloseHandler(func(code int, text string) error {
+ log.Info(fmt.Sprintf("watcher[%s] active closed, code: %d,
message: '%s'", remoteAddr, code, text))
+ return c.sendClose(code, text)
+ })
+ for {
+ _, _, err := c.conn.ReadMessage()
+ if err != nil {
+ if websocket.IsUnexpectedCloseError(err,
websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
+ log.Error("", err)
+ }
+ break
+ }
+ }
+}
+
+func SendEstablishError(conn *websocket.Conn, err error) {
+ remoteAddr := conn.RemoteAddr().String()
+ log.Errorf(err, "establish[%s] websocket failed.", remoteAddr)
+ if err := conn.WriteMessage(websocket.TextMessage,
util.StringToBytesWithNoCopy(err.Error())); err != nil {
+ log.Errorf(err, "establish[%s] websocket failed: write message
failed.", remoteAddr)
+ }
+}
+
+func Heartbeat(ctx context.Context, conn *websocket.Conn, serviceID string,
instanceID string) {
+ domain := util.ParseDomain(ctx)
+ client := newClient(ctx, conn, serviceID, instanceID)
+ metrics.ReportSubscriber(domain, Websocket, 1)
+ process(client)
+ metrics.ReportSubscriber(domain, Websocket, -1)
+}
+
+func process(client *client) {
+ go client.heartbeat()
+ client.handleMessage()
+}
diff --git a/server/connection/hbws/websocket_test.go
b/server/connection/hbws/websocket_test.go
new file mode 100644
index 0000000..729e780
--- /dev/null
+++ b/server/connection/hbws/websocket_test.go
@@ -0,0 +1,67 @@
+package hbws_test
+
+import (
+ "context"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/gorilla/websocket"
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/servicecomb-service-center/server/connection/hbws"
+ "github.com/apache/servicecomb-service-center/server/core"
+)
+
+var closeCh = make(chan struct{})
+
+func init() {
+ testing.Init()
+ core.Initialize()
+}
+
+type watcherConn struct {
+ clientConn *websocket.Conn
+ serverConn *websocket.Conn
+}
+
+func (h *watcherConn) Test() {
+ s := httptest.NewServer(h)
+ h.clientConn, _, _ = websocket.DefaultDialer.Dial(
+ strings.Replace(s.URL, "http://", "ws://", 1), nil)
+}
+
+func (h *watcherConn) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ var upgrader = websocket.Upgrader{}
+ h.serverConn, _ = upgrader.Upgrade(w, r, nil)
+ for {
+ //h.ServerConn.WriteControl(websocket.PingMessage, []byte{},
time.Now().Add(time.Second))
+ //h.ServerConn.WriteControl(websocket.PongMessage, []byte{},
time.Now().Add(time.Second))
+ _, _, err := h.serverConn.ReadMessage()
+ if err != nil {
+ return
+ }
+ <-closeCh
+ h.serverConn.WriteControl(websocket.CloseMessage, []byte{},
time.Now().Add(time.Second))
+ h.serverConn.Close()
+ return
+ }
+}
+
+func NewTest() *watcherConn {
+ ts := &watcherConn{}
+ ts.Test()
+ return ts
+}
+
+func TestHeartbeat(t *testing.T) {
+ mock := NewTest()
+ go hbws.Heartbeat(context.Background(), mock.serverConn, "", "")
+ err := mock.serverConn.WriteMessage(websocket.TextMessage,
[]byte("hello"))
+ assert.Nil(t, err)
+ _, p, err := mock.clientConn.ReadMessage()
+ assert.Nil(t, err)
+ assert.Equal(t, "hello", string(p))
+}
diff --git a/server/connection/ws/websocket.go
b/server/connection/ws/websocket.go
index e9fe3c1..0e9a7e0 100644
--- a/server/connection/ws/websocket.go
+++ b/server/connection/ws/websocket.go
@@ -20,9 +20,10 @@ package ws
import (
"context"
"fmt"
- "github.com/apache/servicecomb-service-center/pkg/util"
"time"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/server/connection"
@@ -32,7 +33,7 @@ import (
const Websocket = "Websocket"
-var errServiceNotExist = fmt.Errorf("Service does not exist.")
+var errServiceNotExist = fmt.Errorf("service does not exist")
type WebSocket struct {
Options
diff --git a/server/connection/ws/websocket_test.go
b/server/connection/ws/websocket_test.go
index ca40bd6..7518b5a 100644
--- a/server/connection/ws/websocket_test.go
+++ b/server/connection/ws/websocket_test.go
@@ -159,6 +159,6 @@ func TestWebSocket_CheckHealth(t *testing.T) {
})
t.Run("should return err when consumer not exist", func(t *testing.T) {
ws := wss.NewWebSocket("", "", mock.ServerConn)
- assert.Equal(t, "Service does not exist.",
ws.CheckHealth(context.Background()).Error())
+ assert.Equal(t, "service does not exist",
ws.CheckHealth(context.Background()).Error())
})
}
diff --git a/server/event/instance_subscriber.go
b/server/event/instance_subscriber.go
index 9d5c1c4..fff474b 100644
--- a/server/event/instance_subscriber.go
+++ b/server/event/instance_subscriber.go
@@ -19,6 +19,7 @@ package event
import (
"fmt"
+
"github.com/apache/servicecomb-service-center/pkg/event"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/server/metrics"
diff --git a/server/metrics/connection.go b/server/metrics/connection.go
index c012980..18366dd 100644
--- a/server/metrics/connection.go
+++ b/server/metrics/connection.go
@@ -18,9 +18,10 @@
package metrics
import (
- "github.com/apache/servicecomb-service-center/pkg/event"
"time"
+ "github.com/apache/servicecomb-service-center/pkg/event"
+
"github.com/apache/servicecomb-service-center/pkg/metrics"
helper "github.com/apache/servicecomb-service-center/pkg/prometheus"
"github.com/prometheus/client_golang/prometheus"
diff --git a/server/rest/controller/v4/instance_watcher.go
b/server/rest/controller/v4/instance_watcher.go
index 9396ee5..c8bf7f1 100644
--- a/server/rest/controller/v4/instance_watcher.go
+++ b/server/rest/controller/v4/instance_watcher.go
@@ -34,6 +34,7 @@ type WatchService struct {
func (s *WatchService) URLPatterns() []rest.Route {
return []rest.Route{
{Method: rest.HTTPMethodGet, Path:
"/v4/:project/registry/microservices/:serviceId/watcher", Func: s.Watch},
+ {Method: rest.HTTPMethodGet, Path:
"/v4/:project/registry/microservices/:serviceId/instances/:instanceId/heartbeat",
Func: s.Heartbeat},
}
}
@@ -62,3 +63,16 @@ func (s *WatchService) Watch(w http.ResponseWriter, r
*http.Request) {
SelfServiceId: r.URL.Query().Get(":serviceId"),
}, conn)
}
+
+func (s *WatchService) Heartbeat(w http.ResponseWriter, r *http.Request) {
+ conn, err := upgrade(w, r)
+ if err != nil {
+ log.Error("failed to establish connection", err)
+ return
+ }
+ defer conn.Close()
+ core.InstanceAPI.WatchHeartbeat(r.Context(), &pb.HeartbeatRequest{
+ ServiceId: r.URL.Query().Get(":serviceId"),
+ InstanceId: r.URL.Query().Get(":instanceId"),
+ }, conn)
+}
diff --git a/server/server.go b/server/server.go
index 8b6f099..9579c28 100644
--- a/server/server.go
+++ b/server/server.go
@@ -19,12 +19,13 @@ package server
import (
"context"
- "github.com/apache/servicecomb-service-center/server/event"
"net"
"os"
"strings"
"time"
+ "github.com/apache/servicecomb-service-center/server/event"
+
"github.com/apache/servicecomb-service-center/datasource"
nf "github.com/apache/servicecomb-service-center/pkg/event"
"github.com/apache/servicecomb-service-center/pkg/gopool"
diff --git a/server/service/watch.go b/server/service/watch.go
index 8813b0f..998c35a 100644
--- a/server/service/watch.go
+++ b/server/service/watch.go
@@ -22,13 +22,15 @@ import (
"errors"
"fmt"
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/gorilla/websocket"
+
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/proto"
"github.com/apache/servicecomb-service-center/server/connection/grpc"
+ "github.com/apache/servicecomb-service-center/server/connection/hbws"
"github.com/apache/servicecomb-service-center/server/connection/ws"
- pb "github.com/go-chassis/cari/discovery"
- "github.com/gorilla/websocket"
)
func (s *InstanceService) WatchPreOpera(ctx context.Context, in
*pb.WatchInstanceRequest) error {
@@ -39,6 +41,7 @@ func (s *InstanceService) WatchPreOpera(ctx context.Context,
in *pb.WatchInstanc
ServiceId: in.SelfServiceId,
})
if err != nil {
+ log.Error("", err)
return err
}
if !resp.Exist {
@@ -47,6 +50,23 @@ func (s *InstanceService) WatchPreOpera(ctx context.Context,
in *pb.WatchInstanc
return nil
}
+func (s *InstanceService) HeartBeatPreOpera(ctx context.Context, in
*pb.HeartbeatRequest) error {
+ if in == nil || len(in.ServiceId) == 0 || len(in.InstanceId) == 0 {
+ return errors.New("request format invalid")
+ }
+ resp, err := datasource.Instance().ExistInstanceByID(ctx,
&pb.MicroServiceInstanceKey{
+ ServiceId: in.ServiceId,
+ InstanceId: in.InstanceId,
+ })
+ if err != nil {
+ return err
+ }
+ if !resp.Exist {
+ return datasource.ErrInstanceNotExists
+ }
+ return nil
+}
+
func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, stream
proto.ServiceInstanceCtrlWatchServer) error {
log.Infof("new a stream list and watch with service[%s]",
in.SelfServiceId)
if err := s.WatchPreOpera(stream.Context(), in); err != nil {
@@ -66,6 +86,15 @@ func (s *InstanceService) WebSocketWatch(ctx
context.Context, in *pb.WatchInstan
ws.Watch(ctx, in.SelfServiceId, conn)
}
+func (s *InstanceService) WatchHeartbeat(ctx context.Context, in
*pb.HeartbeatRequest, conn *websocket.Conn) {
+ log.Info(fmt.Sprintf("new a web socket with service[%s] ,instance[%s]",
in.ServiceId, in.InstanceId))
+ if err := s.HeartBeatPreOpera(ctx, in); err != nil {
+ hbws.SendEstablishError(conn, err)
+ return
+ }
+ hbws.Heartbeat(ctx, conn, in.ServiceId, in.InstanceId)
+}
+
func (s *InstanceService) QueryAllProvidersInstances(ctx context.Context, in
*pb.WatchInstanceRequest) ([]*pb.WatchInstanceResponse, int64) {
depResp, err := datasource.Instance().SearchConsumerDependency(ctx,
&pb.GetDependenciesRequest{
ServiceId: in.SelfServiceId,