This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new a0e76c3  [SCB-2094] Bugfix-cache heartbeat mode in mongo (#874)
a0e76c3 is described below

commit a0e76c30ca34a9340e177b8e4b5e212110f62d18
Author: robotLJW <[email protected]>
AuthorDate: Fri Feb 26 18:33:04 2021 +0800

    [SCB-2094] Bugfix-cache heartbeat mode in mongo (#874)
    
    1. When mongodb is used, SC does not implement its own heartbeat reporting 
interface
---
 datasource/mongo/engine.go                         | 50 +++++++++++++++++++---
 datasource/mongo/heartbeat/cache/heartbeatcache.go | 15 +++++++
 datasource/mongo/mongo.go                          |  4 +-
 3 files changed, 61 insertions(+), 8 deletions(-)

diff --git a/datasource/mongo/engine.go b/datasource/mongo/engine.go
index 07bca41..1024fd2 100644
--- a/datasource/mongo/engine.go
+++ b/datasource/mongo/engine.go
@@ -19,23 +19,23 @@ package mongo
 
 import (
        "context"
-       "time"
-
-       "github.com/apache/servicecomb-service-center/pkg/cluster"
-
        "fmt"
        "strconv"
        "strings"
+       "time"
+
+       pb "github.com/go-chassis/cari/discovery"
+       "go.mongodb.org/mongo-driver/bson"
 
        "github.com/apache/servicecomb-service-center/datasource"
        "github.com/apache/servicecomb-service-center/datasource/etcd/path"
        "github.com/apache/servicecomb-service-center/datasource/mongo/client"
+       "github.com/apache/servicecomb-service-center/pkg/cluster"
+       "github.com/apache/servicecomb-service-center/pkg/gopool"
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/pkg/util"
        "github.com/apache/servicecomb-service-center/server/core"
        "github.com/apache/servicecomb-service-center/server/metrics"
-       pb "github.com/go-chassis/cari/discovery"
-       "go.mongodb.org/mongo-driver/bson"
 )
 
 func (ds *DataSource) SelfRegister(ctx context.Context) error {
@@ -192,8 +192,44 @@ func (ds *DataSource) registryInstance(pCtx 
context.Context) error {
        return nil
 }
 
+func (ds *DataSource) selfHeartBeat(pCtx context.Context) error {
+       ctx := core.AddDefaultContextValue(pCtx)
+       respI, err := core.InstanceAPI.Heartbeat(ctx, core.HeartbeatRequest())
+       if err != nil {
+               log.Error("send heartbeat failed", err)
+               return err
+       }
+       if respI.Response.GetCode() == pb.ResponseSuccess {
+               log.Debugf("update service center instance[%s/%s] heartbeat",
+                       core.Instance.ServiceId, core.Instance.InstanceId)
+               return nil
+       }
+       err = fmt.Errorf(respI.Response.GetMessage())
+       log.Errorf(err, "update service center instance[%s/%s] heartbeat 
failed",
+               core.Instance.ServiceId, core.Instance.InstanceId)
+       return err
+}
+
 func (ds *DataSource) autoSelfHeartBeat() {
-       //todo
+       gopool.Go(func(ctx context.Context) {
+               for {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       case 
<-time.After(time.Duration(core.Instance.HealthCheck.Interval) * time.Second):
+                               err := ds.selfHeartBeat(ctx)
+                               if err == nil {
+                                       continue
+                               }
+                               //服务不存在,创建服务
+                               err = ds.SelfRegister(ctx)
+                               if err != nil {
+                                       log.Errorf(err, "retry to 
register[%s/%s/%s/%s] failed",
+                                               core.Service.Environment, 
core.Service.AppId, core.Service.ServiceName, core.Service.Version)
+                               }
+                       }
+               }
+       })
 }
 
 func GetAllServicesAcrossDomainProject(ctx context.Context) 
(map[string][]*pb.MicroService, error) {
diff --git a/datasource/mongo/heartbeat/cache/heartbeatcache.go 
b/datasource/mongo/heartbeat/cache/heartbeatcache.go
index a9f4d80..e3d229f 100644
--- a/datasource/mongo/heartbeat/cache/heartbeatcache.go
+++ b/datasource/mongo/heartbeat/cache/heartbeatcache.go
@@ -29,6 +29,14 @@ import (
        "github.com/apache/servicecomb-service-center/pkg/util"
 )
 
+const (
+       maxInterval     = 60
+       minInterval     = 0
+       defaultInterval = 30
+       maxTimes        = 3
+       minTimes        = 0
+)
+
 var ErrHeartbeatConversionFailed = errors.New("instanceHeartbeatInfo type 
conversion failed. ")
 
 func init() {
@@ -91,6 +99,13 @@ func notInCacheStrategy(ctx context.Context, request 
*pb.HeartbeatRequest) (*pb.
                return resp, err
        }
        interval, times := instance.Instance.HealthCheck.Interval, 
instance.Instance.HealthCheck.Times
+       // Set the range of interval and time
+       if interval > maxInterval || interval < minTimes {
+               interval = defaultInterval
+       }
+       if times > maxTimes || times < minTimes {
+               times = maxTimes
+       }
        err = addHeartbeatTask(request.ServiceId, request.InstanceId, 
interval*(times+1))
        if err != nil {
                log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator 
%s", request.InstanceId, remoteIP), err)
diff --git a/datasource/mongo/mongo.go b/datasource/mongo/mongo.go
index 2ba5388..af0f1d7 100644
--- a/datasource/mongo/mongo.go
+++ b/datasource/mongo/mongo.go
@@ -31,6 +31,8 @@ import (
        "go.mongodb.org/mongo-driver/mongo/options"
 )
 
+const defaultExpireTime = 300
+
 func init() {
        datasource.Install("mongo", NewDataSource)
 }
@@ -139,7 +141,7 @@ func EnsureInstance() {
        wrapCreateCollectionError(err)
 
        instanceIndex := BuildIndexDoc(ColumnRefreshTime)
-       instanceIndex.Options = options.Index().SetExpireAfterSeconds(60)
+       instanceIndex.Options = 
options.Index().SetExpireAfterSeconds(defaultExpireTime)
 
        instanceServiceIndex := 
BuildIndexDoc(StringBuilder([]string{ColumnInstance, ColumnServiceID}))
 

Reply via email to