This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 677b100  SCB-2094 BugFix: join the heartbeat cache (#943)
677b100 is described below

commit 677b1006e130ea9512c63cb928b788614824d145
Author: robotljw <[email protected]>
AuthorDate: Thu Apr 15 15:33:37 2021 +0800

    SCB-2094 BugFix: join the heartbeat cache (#943)
---
 datasource/mongo/heartbeat/cache/heartbeat.go      | 97 +++++++++++-----------
 datasource/mongo/heartbeat/cache/heartbeat_test.go | 22 ++---
 datasource/mongo/heartbeat/cache/heartbeatcache.go | 24 ++++--
 .../mongo/heartbeat/cache/heartbeatcache_test.go   |  9 +-
 .../heartbeat/{healthcheck.go => cache/types.go}   | 20 ++---
 .../mongo/heartbeat/checker/heartbeatchecker.go    |  5 ++
 datasource/mongo/heartbeat/healthcheck.go          |  3 +
 datasource/mongo/ms.go                             |  8 +-
 8 files changed, 106 insertions(+), 82 deletions(-)

diff --git a/datasource/mongo/heartbeat/cache/heartbeat.go 
b/datasource/mongo/heartbeat/cache/heartbeat.go
index 61e9b53..4094cfd 100644
--- a/datasource/mongo/heartbeat/cache/heartbeat.go
+++ b/datasource/mongo/heartbeat/cache/heartbeat.go
@@ -22,6 +22,7 @@ import (
        "errors"
        "fmt"
        "runtime"
+       "sync"
        "time"
 
        "github.com/patrickmn/go-cache"
@@ -44,59 +45,61 @@ const (
        ctxTimeout              = 5 * time.Second
 )
 
-// Store cache structure
-type instanceHeartbeatInfo struct {
-       serviceID   string
-       instanceID  string
-       ttl         int32
-       lastRefresh time.Time
-}
+var ErrHeartbeatTimeout = errors.New("heartbeat task waiting for processing 
timeout. ")
 
 var (
-       cacheChan              chan *instanceHeartbeatInfo
-       instanceHeartbeatStore = cache.New(0, instanceCheckerInternal)
-       workerNum              = runtime.NumCPU()
-       heartbeatTaskTimeout   = 
config.GetInt("registry.mongo.heartbeat.timeout", defaultTimeout)
-       ErrHeartbeatTimeout    = errors.New("heartbeat task waiting for 
processing timeout. ")
+       once sync.Once
+       cfg  cacheConfig
 )
 
-func init() {
-       capacity := config.GetInt("registry.mongo.heartbeat.cacheCapacity", 
defaultCacheCapacity)
-       cacheChan = make(chan *instanceHeartbeatInfo, capacity)
-       num := config.GetInt("registry.mongo.heartbeat.workerNum", 
defaultWorkNum)
-       if num != 0 {
-               workerNum = num
-       }
-       instanceHeartbeatStore.OnEvicted(func(k string, v interface{}) {
-               instanceInfo, ok := v.(*instanceHeartbeatInfo)
-               if ok && instanceInfo != nil {
-                       ctx, cancel := 
context.WithTimeout(context.Background(), ctxTimeout)
-                       defer cancel()
-                       err := cleanInstance(ctx, instanceInfo.serviceID, 
instanceInfo.instanceID)
-                       if err != nil {
-                               log.Error("failed to cleanInstance in 
mongodb.", err)
-                       }
-               }
-       })
+type cacheConfig struct {
+       cacheChan              chan *instanceHeartbeatInfo
+       instanceHeartbeatStore *cache.Cache
+       workerNum              int
+       heartbeatTaskTimeout   int
+}
 
-       for i := 1; i <= workerNum; i++ {
-               gopool.Go(func(ctx context.Context) {
-                       for {
-                               select {
-                               case <-ctx.Done():
-                                       log.Warn("heartbeat work protocol 
exit.")
-                                       return
-                               case heartbeatInfo, ok := <-cacheChan:
-                                       if ok {
-                                               
instanceHeartbeatStore.Set(heartbeatInfo.instanceID, heartbeatInfo, 
time.Duration(heartbeatInfo.ttl)*time.Second)
-                                       }
+func configuration() *cacheConfig {
+       once.Do(func() {
+               cfg.workerNum = runtime.NumCPU()
+               num := config.GetInt("registry.mongo.heartbeat.workerNum", 
defaultWorkNum)
+               if num != 0 {
+                       cfg.workerNum = num
+               }
+               cfg.heartbeatTaskTimeout = 
config.GetInt("registry.mongo.heartbeat.timeout", defaultTimeout)
+               cfg.cacheChan = make(chan *instanceHeartbeatInfo, 
config.GetInt("registry.mongo.heartbeat.cacheCapacity", defaultCacheCapacity))
+               cfg.instanceHeartbeatStore = cache.New(0, 
instanceCheckerInternal)
+               cfg.instanceHeartbeatStore.OnEvicted(func(k string, v 
interface{}) {
+                       instanceInfo, ok := v.(*instanceHeartbeatInfo)
+                       if ok && instanceInfo != nil {
+                               ctx, cancel := 
context.WithTimeout(context.Background(), ctxTimeout)
+                               defer cancel()
+                               err := cleanInstance(ctx, 
instanceInfo.serviceID, instanceInfo.instanceID)
+                               if err != nil {
+                                       log.Error("failed to cleanInstance in 
mongodb.", err)
                                }
                        }
                })
-       }
+               for i := 1; i <= cfg.workerNum; i++ {
+                       gopool.Go(func(ctx context.Context) {
+                               for {
+                                       select {
+                                       case <-ctx.Done():
+                                               log.Warn("heartbeat work 
protocol exit.")
+                                               return
+                                       case heartbeatInfo, ok := 
<-cfg.cacheChan:
+                                               if ok {
+                                                       
cfg.instanceHeartbeatStore.Set(heartbeatInfo.instanceID, heartbeatInfo, 
time.Duration(heartbeatInfo.ttl)*time.Second)
+                                               }
+                                       }
+                               }
+                       })
+               }
+       })
+       return &cfg
 }
 
-func addHeartbeatTask(serviceID string, instanceID string, ttl int32) error {
+func (c *cacheConfig) AddHeartbeatTask(serviceID string, instanceID string, 
ttl int32) error {
        // Unassigned setting default value is 30s
        if ttl <= 0 {
                ttl = defaultTTL
@@ -108,16 +111,16 @@ func addHeartbeatTask(serviceID string, instanceID 
string, ttl int32) error {
                lastRefresh: time.Now(),
        }
        select {
-       case cacheChan <- newInstance:
+       case c.cacheChan <- newInstance:
                return nil
-       case <-time.After(time.Duration(heartbeatTaskTimeout) * time.Second):
+       case <-time.After(time.Duration(c.heartbeatTaskTimeout) * time.Second):
                log.Warn("the heartbeat's channel is full. ")
                return ErrHeartbeatTimeout
        }
 }
 
-func RemoveCacheInstance(instanceID string) {
-       instanceHeartbeatStore.Delete(instanceID)
+func (c *cacheConfig) RemoveCacheInstance(instanceID string) {
+       c.instanceHeartbeatStore.Delete(instanceID)
 }
 
 func cleanInstance(ctx context.Context, serviceID string, instanceID string) 
error {
diff --git a/datasource/mongo/heartbeat/cache/heartbeat_test.go 
b/datasource/mongo/heartbeat/cache/heartbeat_test.go
index d6845e6..26bbc4a 100644
--- a/datasource/mongo/heartbeat/cache/heartbeat_test.go
+++ b/datasource/mongo/heartbeat/cache/heartbeat_test.go
@@ -42,6 +42,8 @@ func init() {
        client.NewMongoClient(config)
 }
 
+var c = configuration()
+
 func TestAddCacheInstance(t *testing.T) {
        t.Run("add cache instance: set the ttl to 2 seconds", func(t 
*testing.T) {
                instance1 := model.Instance{
@@ -55,11 +57,11 @@ func TestAddCacheInstance(t *testing.T) {
                                },
                        },
                }
-               err := addHeartbeatTask(instance1.Instance.ServiceId, 
instance1.Instance.InstanceId, 
instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
+               err := c.AddHeartbeatTask(instance1.Instance.ServiceId, 
instance1.Instance.InstanceId, 
instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
                assert.Equal(t, nil, err)
                _, err = client.GetMongoClient().Insert(context.Background(), 
model.CollectionInstance, instance1)
                assert.Equal(t, nil, err)
-               info, ok := 
instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+               info, ok := 
c.instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
                assert.Equal(t, true, ok)
                if ok {
                        heartBeatInfo := info.(*instanceHeartbeatInfo)
@@ -67,7 +69,7 @@ func TestAddCacheInstance(t *testing.T) {
                        assert.Equal(t, 
instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1),
 heartBeatInfo.ttl)
                }
                time.Sleep(2 * time.Second)
-               _, ok = 
instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+               _, ok = 
c.instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
                assert.Equal(t, false, ok)
                _, err = client.GetMongoClient().Delete(context.Background(), 
model.CollectionInstance, instance1)
                assert.Equal(t, nil, err)
@@ -85,11 +87,11 @@ func TestAddCacheInstance(t *testing.T) {
                                },
                        },
                }
-               err := addHeartbeatTask(instance1.Instance.ServiceId, 
instance1.Instance.InstanceId, 
instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
+               err := c.AddHeartbeatTask(instance1.Instance.ServiceId, 
instance1.Instance.InstanceId, 
instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
                assert.Equal(t, nil, err)
                _, err = client.GetMongoClient().Insert(context.Background(), 
model.CollectionInstance, instance1)
                assert.Equal(t, nil, err)
-               info, ok := 
instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+               info, ok := 
c.instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
                assert.Equal(t, true, ok)
                if ok {
                        heartBeatInfo := info.(*instanceHeartbeatInfo)
@@ -97,7 +99,7 @@ func TestAddCacheInstance(t *testing.T) {
                        assert.Equal(t, int32(defaultTTL), heartBeatInfo.ttl)
                }
                time.Sleep(defaultTTL * time.Second)
-               _, ok = 
instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+               _, ok = 
c.instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
                assert.Equal(t, false, ok)
                _, err = client.GetMongoClient().Delete(context.Background(), 
model.CollectionInstance, instance1)
                assert.Equal(t, nil, err)
@@ -117,11 +119,11 @@ func TestRemoveCacheInstance(t *testing.T) {
                                },
                        },
                }
-               err := addHeartbeatTask(instance1.Instance.ServiceId, 
instance1.Instance.InstanceId, 
instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
+               err := c.AddHeartbeatTask(instance1.Instance.ServiceId, 
instance1.Instance.InstanceId, 
instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
                assert.Equal(t, nil, err)
                _, err = client.GetMongoClient().Insert(context.Background(), 
model.CollectionInstance, instance1)
                assert.Equal(t, nil, err)
-               info, ok := 
instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+               info, ok := 
c.instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
                assert.Equal(t, true, ok)
                if ok {
                        heartBeatInfo := info.(*instanceHeartbeatInfo)
@@ -129,8 +131,8 @@ func TestRemoveCacheInstance(t *testing.T) {
                        assert.Equal(t, 
instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1),
 heartBeatInfo.ttl)
                }
                time.Sleep(4 * time.Second)
-               RemoveCacheInstance(instance1.Instance.InstanceId)
-               _, ok = 
instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+               c.RemoveCacheInstance(instance1.Instance.InstanceId)
+               _, ok = 
c.instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
                assert.Equal(t, false, ok)
                _, err = client.GetMongoClient().Delete(context.Background(), 
model.CollectionInstance, instance1)
                assert.Equal(t, nil, err)
diff --git a/datasource/mongo/heartbeat/cache/heartbeatcache.go 
b/datasource/mongo/heartbeat/cache/heartbeatcache.go
index 31ae699..c6381e9 100644
--- a/datasource/mongo/heartbeat/cache/heartbeatcache.go
+++ b/datasource/mongo/heartbeat/cache/heartbeatcache.go
@@ -44,20 +44,26 @@ func init() {
 }
 
 type HeartBeatCache struct {
+       cfg *cacheConfig
 }
 
 func NewHeartBeatCache(opts heartbeat.Options) (heartbeat.HealthCheck, error) {
-       return &HeartBeatCache{}, nil
+       return &HeartBeatCache{cfg: configuration()}, nil
 }
 
 func (h *HeartBeatCache) Heartbeat(ctx context.Context, request 
*pb.HeartbeatRequest) (*pb.HeartbeatResponse, error) {
-       if ins, ok := instanceHeartbeatStore.Get(request.InstanceId); ok {
-               return inCacheStrategy(ctx, request, ins)
+       if ins, ok := h.cfg.instanceHeartbeatStore.Get(request.InstanceId); ok {
+               return h.inCacheStrategy(ctx, request, ins)
        }
-       return notInCacheStrategy(ctx, request)
+       return h.notInCacheStrategy(ctx, request)
 }
 
-func inCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest, 
insHeartbeatInfo interface{}) (*pb.HeartbeatResponse, error) {
+// Add instance related information to the cache
+func (h *HeartBeatCache) CheckInstance(ctx context.Context, instance 
*pb.MicroServiceInstance) error {
+       return h.cfg.AddHeartbeatTask(instance.ServiceId, instance.InstanceId, 
instance.HealthCheck.Interval*(instance.HealthCheck.Times+1))
+}
+
+func (h *HeartBeatCache) inCacheStrategy(ctx context.Context, request 
*pb.HeartbeatRequest, insHeartbeatInfo interface{}) (*pb.HeartbeatResponse, 
error) {
        remoteIP := util.GetIPFromContext(ctx)
        heartbeatInfo, ok := insHeartbeatInfo.(*instanceHeartbeatInfo)
        if !ok {
@@ -67,7 +73,7 @@ func inCacheStrategy(ctx context.Context, request 
*pb.HeartbeatRequest, insHeart
                }
                return resp, ErrHeartbeatConversionFailed
        }
-       err := addHeartbeatTask(request.ServiceId, request.InstanceId, 
heartbeatInfo.ttl)
+       err := h.cfg.AddHeartbeatTask(request.ServiceId, request.InstanceId, 
heartbeatInfo.ttl)
        if err != nil {
                log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator 
%s", request.InstanceId, remoteIP), err)
                resp := &pb.HeartbeatResponse{
@@ -88,7 +94,7 @@ func inCacheStrategy(ctx context.Context, request 
*pb.HeartbeatRequest, insHeart
        }, nil
 }
 
-func notInCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest) 
(*pb.HeartbeatResponse, error) {
+func (h *HeartBeatCache) notInCacheStrategy(ctx context.Context, request 
*pb.HeartbeatRequest) (*pb.HeartbeatResponse, error) {
        remoteIP := util.GetIPFromContext(ctx)
        instance, err := findInstance(ctx, request.ServiceId, 
request.InstanceId)
        if err != nil {
@@ -106,7 +112,7 @@ func notInCacheStrategy(ctx context.Context, request 
*pb.HeartbeatRequest) (*pb.
        if times > maxTimes || times < minTimes {
                times = maxTimes
        }
-       err = addHeartbeatTask(request.ServiceId, request.InstanceId, 
interval*(times+1))
+       err = h.cfg.AddHeartbeatTask(request.ServiceId, request.InstanceId, 
interval*(times+1))
        if err != nil {
                log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator 
%s", request.InstanceId, remoteIP), err)
                resp := &pb.HeartbeatResponse{
@@ -116,7 +122,7 @@ func notInCacheStrategy(ctx context.Context, request 
*pb.HeartbeatRequest) (*pb.
        }
        err = updateInstance(ctx, request.ServiceId, request.InstanceId)
        if err != nil {
-               RemoveCacheInstance(request.InstanceId)
+               h.cfg.RemoveCacheInstance(request.InstanceId)
                log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator 
%s", request.InstanceId, remoteIP), err)
                resp := &pb.HeartbeatResponse{
                        Response: 
pb.CreateResponseWithSCErr(pb.NewError(pb.ErrInstanceNotExists, err.Error())),
diff --git a/datasource/mongo/heartbeat/cache/heartbeatcache_test.go 
b/datasource/mongo/heartbeat/cache/heartbeatcache_test.go
index da2308a..37bb0c2 100644
--- a/datasource/mongo/heartbeat/cache/heartbeatcache_test.go
+++ b/datasource/mongo/heartbeat/cache/heartbeatcache_test.go
@@ -31,9 +31,10 @@ import (
        "github.com/apache/servicecomb-service-center/datasource/mongo/util"
 )
 
+var heartBeatCheck = &HeartBeatCache{cfg: configuration()}
+
 func TestHeartBeatCheck(t *testing.T) {
        t.Run("heartbeat check: instance does not exist,it should be failed", 
func(t *testing.T) {
-               heartBeatCheck := &HeartBeatCache{}
                resp, err := heartBeatCheck.Heartbeat(context.Background(), 
&pb.HeartbeatRequest{
                        ServiceId:  "serviceId1",
                        InstanceId: "not-exist-ins",
@@ -43,9 +44,8 @@ func TestHeartBeatCheck(t *testing.T) {
        })
 
        t.Run("heartbeat check: data exists in the cache,but not in db,it 
should be failed", func(t *testing.T) {
-               err := addHeartbeatTask("not-exist-svc", "not-exist-ins", 30)
+               err := heartBeatCheck.cfg.AddHeartbeatTask("not-exist-svc", 
"not-exist-ins", 30)
                assert.Nil(t, err)
-               heartBeatCheck := &HeartBeatCache{}
                resp, err := heartBeatCheck.Heartbeat(context.Background(), 
&pb.HeartbeatRequest{
                        ServiceId:  "serviceId1",
                        InstanceId: "not-exist-ins",
@@ -55,7 +55,6 @@ func TestHeartBeatCheck(t *testing.T) {
        })
 
        t.Run("heartbeat check: data exists in the cache and db,it can be 
update successfully", func(t *testing.T) {
-               heartBeatCheck := &HeartBeatCache{}
                instanceDB := model.Instance{
                        RefreshTime: time.Now(),
                        Instance: &pb.MicroServiceInstance{
@@ -73,7 +72,7 @@ func TestHeartBeatCheck(t *testing.T) {
                _, _ = client.GetMongoClient().Delete(context.Background(), 
model.CollectionInstance, filter)
                _, err := client.GetMongoClient().Insert(context.Background(), 
model.CollectionInstance, instanceDB)
                assert.Equal(t, nil, err)
-               err = addHeartbeatTask(instanceDB.Instance.ServiceId, 
instanceDB.Instance.InstanceId, 
instanceDB.Instance.HealthCheck.Interval*(instanceDB.Instance.HealthCheck.Times+1))
+               err = 
heartBeatCheck.cfg.AddHeartbeatTask(instanceDB.Instance.ServiceId, 
instanceDB.Instance.InstanceId, 
instanceDB.Instance.HealthCheck.Interval*(instanceDB.Instance.HealthCheck.Times+1))
                assert.Equal(t, nil, err)
                resp, err := heartBeatCheck.Heartbeat(context.Background(), 
&pb.HeartbeatRequest{
                        ServiceId:  "serviceIdDB",
diff --git a/datasource/mongo/heartbeat/healthcheck.go 
b/datasource/mongo/heartbeat/cache/types.go
similarity index 65%
copy from datasource/mongo/heartbeat/healthcheck.go
copy to datasource/mongo/heartbeat/cache/types.go
index 193b8c9..01da683 100644
--- a/datasource/mongo/heartbeat/healthcheck.go
+++ b/datasource/mongo/heartbeat/cache/types.go
@@ -3,26 +3,26 @@
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License"); you may not use this file except request compliance with
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
+ * Unless required by applicable law or agreed to request writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
-package heartbeat
+package heartbeatcache
 
-import (
-       "context"
+import "time"
 
-       pb "github.com/go-chassis/cari/discovery"
-)
-
-type HealthCheck interface {
-       Heartbeat(ctx context.Context, request *pb.HeartbeatRequest) 
(*pb.HeartbeatResponse, error)
+// Store cache structure
+type instanceHeartbeatInfo struct {
+       serviceID   string
+       instanceID  string
+       ttl         int32
+       lastRefresh time.Time
 }
diff --git a/datasource/mongo/heartbeat/checker/heartbeatchecker.go 
b/datasource/mongo/heartbeat/checker/heartbeatchecker.go
index 5183185..b3522b3 100644
--- a/datasource/mongo/heartbeat/checker/heartbeatchecker.go
+++ b/datasource/mongo/heartbeat/checker/heartbeatchecker.go
@@ -54,3 +54,8 @@ func (h *HeartBeatChecker) Heartbeat(ctx context.Context, 
request *pb.HeartbeatR
                        "Update service instance heartbeat successfully."),
        }, nil
 }
+
+func (h *HeartBeatChecker) CheckInstance(ctx context.Context, instance 
*pb.MicroServiceInstance) error {
+       // do nothing
+       return nil
+}
diff --git a/datasource/mongo/heartbeat/healthcheck.go 
b/datasource/mongo/heartbeat/healthcheck.go
index 193b8c9..5cb9fa1 100644
--- a/datasource/mongo/heartbeat/healthcheck.go
+++ b/datasource/mongo/heartbeat/healthcheck.go
@@ -24,5 +24,8 @@ import (
 )
 
 type HealthCheck interface {
+       // processing heartbeat request
        Heartbeat(ctx context.Context, request *pb.HeartbeatRequest) 
(*pb.HeartbeatResponse, error)
+       // processing heartbeat check of instance after registration
+       CheckInstance(ctx context.Context, instance *pb.MicroServiceInstance) 
error
 }
diff --git a/datasource/mongo/ms.go b/datasource/mongo/ms.go
index 5d91558..3d224b8 100644
--- a/datasource/mongo/ms.go
+++ b/datasource/mongo/ms.go
@@ -1933,7 +1933,7 @@ func registryInstance(ctx context.Context, request 
*discovery.RegisterInstanceRe
        project := util.ParseProject(ctx)
        remoteIP := util.GetIPFromContext(ctx)
        instance := request.Instance
-       ttl := int64(instance.HealthCheck.Interval * 
(instance.HealthCheck.Times + 1))
+       ttl := instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1)
 
        instanceFlag := fmt.Sprintf("ttl %ds, endpoints %v, host '%s', 
serviceID %s",
                ttl, instance.Endpoints, instance.HostName, instance.ServiceId)
@@ -1960,6 +1960,12 @@ func registryInstance(ctx context.Context, request 
*discovery.RegisterInstanceRe
                }, err
        }
 
+       // need to complete the instance offline function in time, so you need 
to check the heartbeat after registering the instance
+       err = heartbeat.Instance().CheckInstance(ctx, instance)
+       if err != nil {
+               log.Error(fmt.Sprintf("fail to check instance, instance[%s]. 
operator %s", instance.InstanceId, remoteIP), err)
+       }
+
        log.Info(fmt.Sprintf("register instance %s, instanceID %s, operator %s",
                instanceFlag, insertRes.InsertedID, remoteIP))
        return &discovery.RegisterInstanceResponse{

Reply via email to