This is an automated email from the ASF dual-hosted git repository.

humingcheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/dev by this push:
     new 775ee362 Do not return the instances from peer, if sync health check 
failed. (#1503)
775ee362 is described below

commit 775ee3626fe402130a9cc96feb006e0a6d2b2f73
Author: humingcheng <[email protected]>
AuthorDate: Fri Mar 7 00:44:36 2025 +0800

    Do not return the instances from peer, if sync health check failed. (#1503)
    
    * Do not return the instances from peer, if sync health check failed.
---
 datasource/etcd/cache/filter_instances.go          |  11 +-
 pkg/util/common.go                                 |  15 +-
 pkg/util/util.go                                   |  16 +++
 pkg/util/util_test.go                              |  83 +++++++++++
 server/service/disco/instance.go                   |   8 +-
 server/service/sync/sync.go                        |   5 +
 syncer/service/admin/check.go                      |  82 +++++++++++
 syncer/service/admin/check_test.go                 |  68 +++++++++
 syncer/service/admin/health.go                     |  29 +++-
 syncer/service/admin/instance_event_handler.go     | 138 ++++++++++++++++++
 .../service/admin/instance_event_handler_test.go   | 158 +++++++++++++++++++++
 syncer/service/admin/window.go                     |  32 +++++
 syncer/service/admin/window_test.go                |  71 +++++++++
 13 files changed, 701 insertions(+), 15 deletions(-)

diff --git a/datasource/etcd/cache/filter_instances.go 
b/datasource/etcd/cache/filter_instances.go
index 9b49c2c5..c2ed4288 100644
--- a/datasource/etcd/cache/filter_instances.go
+++ b/datasource/etcd/cache/filter_instances.go
@@ -29,6 +29,7 @@ import (
        "github.com/apache/servicecomb-service-center/datasource/etcd/util"
        "github.com/apache/servicecomb-service-center/pkg/cache"
        "github.com/apache/servicecomb-service-center/pkg/log"
+       util2 "github.com/apache/servicecomb-service-center/pkg/util"
 )
 
 type InstancesFilter struct {
@@ -91,14 +92,22 @@ func (f *InstancesFilter) findInstances(ctx 
context.Context, domainProject, serv
                return
        }
 
+       requiredProperties, _ := 
ctx.Value(util2.CtxRequiredInstancePropertiesOnDisco).(map[string]string)
        for _, kv := range resp.Kvs {
+               inst := kv.Value.(*pb.MicroServiceInstance)
+               if inst == nil {
+                       continue
+               }
+               if !util2.IsMapFullyMatch(inst.Properties, requiredProperties) {
+                       continue
+               }
                if i, ok := getOrCreateClustersIndex()[kv.ClusterName]; ok {
                        if kv.ModRevision > maxRevs[i] {
                                maxRevs[i] = kv.ModRevision
                        }
                        counts[i]++
                }
-               instances = append(instances, 
kv.Value.(*pb.MicroServiceInstance))
+               instances = append(instances, inst)
        }
        return
 }
diff --git a/pkg/util/common.go b/pkg/util/common.go
index b9661f9a..3de7dcb9 100644
--- a/pkg/util/common.go
+++ b/pkg/util/common.go
@@ -22,13 +22,14 @@ import "os"
 type CtxKey string
 
 const (
-       HeaderRev                  = "X-Resource-Revision"
-       CtxGlobal           CtxKey = "global"
-       CtxNocache          CtxKey = "noCache"
-       CtxCacheOnly        CtxKey = "cacheOnly"
-       CtxRequestRevision  CtxKey = "requestRev"
-       CtxResponseRevision CtxKey = "responseRev"
-       CtxEnableSync       CtxKey = "enableSync"
+       HeaderRev                                   = "X-Resource-Revision"
+       CtxGlobal                            CtxKey = "global"
+       CtxNocache                           CtxKey = "noCache"
+       CtxCacheOnly                         CtxKey = "cacheOnly"
+       CtxRequestRevision                   CtxKey = "requestRev"
+       CtxResponseRevision                  CtxKey = "responseRev"
+       CtxEnableSync                        CtxKey = "enableSync"
+       CtxRequiredInstancePropertiesOnDisco CtxKey = 
"requiredInstancePropertiesOnDisco"
 )
 
 func GetAppRoot() string {
diff --git a/pkg/util/util.go b/pkg/util/util.go
index 52551869..391a32fb 100644
--- a/pkg/util/util.go
+++ b/pkg/util/util.go
@@ -194,3 +194,19 @@ func GeneratePassword() (string, error) {
        }
        return pass, nil
 }
+
+func IsMapFullyMatch(source, required map[string]string) bool {
+       if len(required) == 0 {
+               return true
+       }
+       if len(source) < len(required) {
+               return false
+       }
+
+       for key, value := range required {
+               if resourceValue, exists := source[key]; !exists || 
resourceValue != value {
+                       return false
+               }
+       }
+       return true
+}
diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go
index 4950ea5b..56b551f7 100644
--- a/pkg/util/util_test.go
+++ b/pkg/util/util_test.go
@@ -184,3 +184,86 @@ func TestGeneratePassword(t *testing.T) {
        assert.NoError(t, err)
        assert.Equal(t, 8, len(password), password)
 }
+
+func TestIsMapFullyMatch(t *testing.T) {
+       type args struct {
+               resource map[string]string
+               required map[string]string
+       }
+       tests := []struct {
+               name string
+               args args
+               want bool
+       }{
+               {
+                       name: "source/required nil",
+                       args: args{},
+                       want: true,
+               },
+               {
+                       name: "required nil",
+                       args: args{
+                               resource: map[string]string{
+                                       "k1": "v1",
+                               },
+                               required: nil,
+                       },
+                       want: true,
+               },
+               {
+                       name: "source nil",
+                       args: args{
+                               resource: nil,
+                               required: map[string]string{
+                                       "k1": "v1",
+                               },
+                       },
+                       want: false,
+               },
+               {
+                       name: "source is larger",
+                       args: args{
+                               resource: map[string]string{
+                                       "k1": "v1",
+                                       "k2": "v2",
+                               },
+                               required: map[string]string{
+                                       "k1": "v1",
+                               },
+                       },
+                       want: true,
+               },
+               {
+                       name: "source is smaller",
+                       args: args{
+                               resource: map[string]string{
+                                       "k1": "v1",
+                               },
+                               required: map[string]string{
+                                       "k1": "v1",
+                                       "k2": "v2",
+                               },
+                       },
+                       want: false,
+               },
+               {
+                       name: "source equals required",
+                       args: args{
+                               resource: map[string]string{
+                                       "k1": "v1",
+                                       "k2": "v2",
+                               },
+                               required: map[string]string{
+                                       "k1": "v1",
+                                       "k2": "v2",
+                               },
+                       },
+                       want: true,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       assert.Equalf(t, tt.want, 
IsMapFullyMatch(tt.args.resource, tt.args.required), "IsMapFullyMatch(%v, %v)", 
tt.args.resource, tt.args.required)
+               })
+       }
+}
diff --git a/server/service/disco/instance.go b/server/service/disco/instance.go
index 27ff878d..c232b655 100644
--- a/server/service/disco/instance.go
+++ b/server/service/disco/instance.go
@@ -51,7 +51,7 @@ var (
        propertiesMap map[string]string
 )
 
-func getInnerProperties() map[string]string {
+func GetInnerProperties() map[string]string {
        once.Do(func() {
                propertiesMap = 
config.GetStringMap("registry.instance.properties")
        })
@@ -144,7 +144,7 @@ func appendInnerPropertiesToInstance(instance 
*pb.MicroServiceInstance) {
                instance.Properties = make(map[string]string)
        }
 
-       innerProps := getInnerProperties()
+       innerProps := GetInnerProperties()
        if len(innerProps) <= 0 {
                return
        }
@@ -215,7 +215,7 @@ func appendInnerProperties(ctx context.Context, serviceID 
string, instanceID str
 
 func shouldAppendInnerProperties(instance *pb.MicroServiceInstance) bool {
        instProps := instance.Properties
-       innerProps := getInnerProperties()
+       innerProps := GetInnerProperties()
        if len(innerProps) == 0 {
                return false
        }
@@ -457,7 +457,7 @@ func PutInstanceProperties(ctx context.Context, in 
*pb.UpdateInstancePropsReques
                return pb.NewError(pb.ErrInvalidParams, err.Error())
        }
 
-       properties := getInnerProperties()
+       properties := GetInnerProperties()
        if in.Properties == nil {
                in.Properties = make(map[string]string, len(properties))
        }
diff --git a/server/service/sync/sync.go b/server/service/sync/sync.go
index 5b6f26e0..e327c635 100644
--- a/server/service/sync/sync.go
+++ b/server/service/sync/sync.go
@@ -23,6 +23,8 @@ import (
 
        "github.com/apache/servicecomb-service-center/pkg/util"
        "github.com/apache/servicecomb-service-center/server/config"
+       "github.com/apache/servicecomb-service-center/server/service/disco"
+       "github.com/apache/servicecomb-service-center/syncer/service/admin"
 )
 
 var (
@@ -40,6 +42,9 @@ func Enable() bool {
 }
 
 func SetContext(ctx context.Context) context.Context {
+       if Enable() && !admin.ShouldTrustPeerServer() { // 
不信任对端SC,则服务发现时,只保留在本SC注册的实例
+               util.SetContext(ctx, util.CtxRequiredInstancePropertiesOnDisco, 
disco.GetInnerProperties())
+       }
        var val string
        if Enable() {
                val = "1"
diff --git a/syncer/service/admin/check.go b/syncer/service/admin/check.go
new file mode 100644
index 00000000..3934631a
--- /dev/null
+++ b/syncer/service/admin/check.go
@@ -0,0 +1,82 @@
+package admin
+
+import (
+       "context"
+       "fmt"
+       "time"
+
+       "github.com/go-chassis/foundation/gopool"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/syncer/rpc"
+)
+
+func ShouldTrustPeerServer() bool {
+       return globalHealthChecker.ShouldTrustPeerServer()
+}
+
+type HealthChecker struct {
+       checkIntervalBySecond uint
+       latestCheckResult     *Resp
+       latestCheckErr        error
+
+       // 为了容忍网络抖动,使用滑动窗口判断状态
+       failureWindow *HealthCheckWindow
+       // 恢复窗口,成功率要求更高,用于数据同步的恢复
+       recoveryWindow        *HealthCheckWindow
+       shouldTrustPeerServer bool
+}
+
+func (h *HealthChecker) check() {
+       h.latestCheckResult, h.latestCheckErr = checkPeerStatus()
+       if h.latestCheckErr != nil || h.latestCheckResult == nil || 
len(h.latestCheckResult.Peers) == 0 {
+               h.AddResult(false)
+               return
+       }
+       if h.latestCheckResult.Peers[0].Status != rpc.HealthStatusConnected {
+               h.AddResult(false)
+               return
+       }
+       h.AddResult(true)
+}
+
+func (h *HealthChecker) ShouldTrustPeerServer() bool {
+       return h.shouldTrustPeerServer
+}
+
+func (h *HealthChecker) AddResult(pass bool) {
+       h.failureWindow.AddResult(pass)
+       h.recoveryWindow.AddResult(pass)
+
+       shouldTrustPeerServerNew := true
+       if h.shouldTrustPeerServer {
+               // 健康 > 不健康
+               shouldTrustPeerServerNew = h.failureWindow.IsHealthy()
+       } else {
+               // 不健康 > 健康
+               shouldTrustPeerServerNew = h.recoveryWindow.IsHealthy()
+       }
+       if h.shouldTrustPeerServer != shouldTrustPeerServerNew {
+               log.Info(fmt.Sprintf("should trust peer server changed, old: 
%v, new: %v", h.shouldTrustPeerServer, shouldTrustPeerServerNew))
+               h.shouldTrustPeerServer = shouldTrustPeerServerNew
+               return
+       }
+}
+
+func (h *HealthChecker) LatestHealthCheckResult() (*Resp, error) {
+       return h.latestCheckResult, h.latestCheckErr
+}
+
+func (h *HealthChecker) RunChecker() {
+       h.check()
+       gopool.Go(func(ctx context.Context) {
+               for {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       case 
<-time.After(time.Duration(h.checkIntervalBySecond) * time.Second):
+                               h.check()
+                       }
+               }
+       })
+}
diff --git a/syncer/service/admin/check_test.go 
b/syncer/service/admin/check_test.go
new file mode 100644
index 00000000..f1a22dc9
--- /dev/null
+++ b/syncer/service/admin/check_test.go
@@ -0,0 +1,68 @@
+package admin
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestHealthChecker_AddResult(t *testing.T) {
+       h := &HealthChecker{
+               checkIntervalBySecond: 1,
+               failureWindow:         NewHealthCheckWindow(8, 5),
+               recoveryWindow:        NewHealthCheckWindow(4, 2),
+               shouldTrustPeerServer: true,
+       }
+       // 全部true
+       for i := 0; i < 10; i++ {
+               h.AddResult(true)
+               assert.True(t, h.failureWindow.IsHealthy())
+               assert.True(t, h.recoveryWindow.IsHealthy())
+               assert.True(t, h.ShouldTrustPeerServer())
+       }
+
+       // t t t t f f f f
+       h.AddResult(false)
+       h.AddResult(false)
+       h.AddResult(false)
+       h.AddResult(false)
+       assert.True(t, h.failureWindow.IsHealthy())
+       assert.False(t, h.recoveryWindow.IsHealthy()) // recoveryWindow 首先变成失败
+       assert.True(t, h.ShouldTrustPeerServer())     // 结果还是健康,因为 健康 > 不健康,看 
failureWindow
+
+       // t t t f f f f f
+       h.AddResult(false)
+       assert.False(t, h.failureWindow.IsHealthy())
+       assert.False(t, h.recoveryWindow.IsHealthy())
+       assert.False(t, h.ShouldTrustPeerServer()) // 不健康
+
+       // 全false
+       for i := 0; i < 10; i++ {
+               h.AddResult(false)
+               assert.False(t, h.failureWindow.IsHealthy())
+               assert.False(t, h.recoveryWindow.IsHealthy())
+               assert.False(t, h.ShouldTrustPeerServer())
+       }
+       assert.ElementsMatch(t, []bool{false, false, false, false, false, 
false, false, false}, h.failureWindow.checkPassResults)
+       assert.ElementsMatch(t, []bool{false, false, false, false}, 
h.recoveryWindow.checkPassResults)
+
+       h.AddResult(true)
+       h.AddResult(false)
+       h.AddResult(true)
+       h.AddResult(false)
+       h.AddResult(true)
+       h.AddResult(false)
+       h.AddResult(true)
+       h.AddResult(false)
+       assert.ElementsMatch(t, []bool{true, false, true, false, true, false, 
true, false}, h.failureWindow.checkPassResults)
+       assert.ElementsMatch(t, []bool{true, false, true, false}, 
h.recoveryWindow.checkPassResults)
+       assert.True(t, h.failureWindow.IsHealthy())   // failureWindow 恢复
+       assert.False(t, h.recoveryWindow.IsHealthy()) // 但是 recoveryWindow 还没恢复
+       assert.False(t, h.ShouldTrustPeerServer())    // 结果是不健康,因为不健康 > 健康,看 
recoveryWindow
+
+       h.AddResult(true)
+       h.AddResult(true)
+       assert.True(t, h.failureWindow.IsHealthy())
+       assert.True(t, h.recoveryWindow.IsHealthy())
+       assert.True(t, h.ShouldTrustPeerServer()) // 结果健康
+}
diff --git a/syncer/service/admin/health.go b/syncer/service/admin/health.go
index c47ae420..134ac1a5 100644
--- a/syncer/service/admin/health.go
+++ b/syncer/service/admin/health.go
@@ -30,6 +30,8 @@ import (
        "google.golang.org/grpc/status"
 
        "github.com/apache/servicecomb-service-center/client"
+       "github.com/apache/servicecomb-service-center/datasource/etcd/event"
+       
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
        "github.com/apache/servicecomb-service-center/pkg/log"
        pkgrpc "github.com/apache/servicecomb-service-center/pkg/rpc"
        
"github.com/apache/servicecomb-service-center/server/plugin/security/cipher"
@@ -43,11 +45,16 @@ import (
 const (
        scheme      = "grpc"
        serviceName = "syncer"
+
+       // eventPushCheckIntervalBySecond is suggested to be 
checkPeerHealthIntervalBySecond*1/3
+       checkPeerHealthIntervalBySecond = 15
+       eventPushCheckIntervalBySecond  = 5
 )
 
 var (
-       peerInfos        []*PeerInfo
-       ErrConfigIsEmpty = errors.New("sync config is empty")
+       peerInfos           []*PeerInfo
+       ErrConfigIsEmpty    = errors.New("sync config is empty")
+       globalHealthChecker *HealthChecker
 )
 
 type Resp struct {
@@ -98,9 +105,21 @@ func Init() {
                }
                peerInfos = append(peerInfos, &PeerInfo{Peer: p, ClientConn: 
conn})
        }
+
+       globalHealthChecker = &HealthChecker{
+               checkIntervalBySecond: checkPeerHealthIntervalBySecond,
+               // 失败阈值,最近8次检查,5次失败即视为不健康,120s。
+               failureWindow: NewHealthCheckWindow(8, 5),
+               // 恢复阈值,最近6次检查,最多2次失败即视为不健康,即最多1次失败,90s。
+               recoveryWindow:        NewHealthCheckWindow(6, 2),
+               shouldTrustPeerServer: true, // 
默认信任对端,只有通过检查,确认对端无法连接,即两个SC同步异常,才认为对端数据不可信任
+       }
+       globalHealthChecker.RunChecker()
+       h := NewInstanceEventHandler(globalHealthChecker, 
eventPushCheckIntervalBySecond*time.Second, event.NewInstanceEventHandler())
+       kvstore.AddEventHandler(h)
 }
 
-func Health() (*Resp, error) {
+func checkPeerStatus() (*Resp, error) {
        if len(peerInfos) <= 0 {
                return nil, ErrConfigIsEmpty
        }
@@ -124,6 +143,10 @@ func Health() (*Resp, error) {
        return resp, nil
 }
 
+func Health() (*Resp, error) {
+       return checkPeerStatus()
+}
+
 func getPeerStatus(peerInfo *PeerInfo) string {
        if peerInfo.ClientConn == nil {
                log.Warn("clientConn is nil")
diff --git a/syncer/service/admin/instance_event_handler.go 
b/syncer/service/admin/instance_event_handler.go
new file mode 100644
index 00000000..928f055a
--- /dev/null
+++ b/syncer/service/admin/instance_event_handler.go
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package admin
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       pb "github.com/go-chassis/cari/discovery"
+       "github.com/go-chassis/foundation/gopool"
+
+       "github.com/apache/servicecomb-service-center/datasource"
+       "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
+       
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/apache/servicecomb-service-center/server/service/disco"
+)
+
+// InstanceEventHandler 对端SC同步异常期间,无法查询到在对端SC注册的实例,因此这期间对端SC的实例的事件,对客户端而言是无效事件,
+// 需要在对端SC同步恢复后,处理一次
+type InstanceEventHandler struct {
+       handler       kvstore.EventHandler
+       interval      time.Duration
+       healthChecker *HealthChecker
+
+       events     map[string]kvstore.Event
+       eventsLock sync.Mutex
+}
+
+func (h *InstanceEventHandler) Type() kvstore.Type {
+       return sd.TypeInstance
+}
+
+func (h *InstanceEventHandler) OnEvent(evt kvstore.Event) {
+       if h.healthChecker.ShouldTrustPeerServer() {
+               return
+       }
+       instance, ignore := checkShouldIgnoreEvent(evt)
+       if ignore {
+               return
+       }
+       h.add(evt, instance)
+}
+
+func (h *InstanceEventHandler) exportAndClearEvents() []kvstore.Event {
+       h.eventsLock.Lock()
+       defer h.eventsLock.Unlock()
+       result := make([]kvstore.Event, 0, len(h.events))
+       for _, evt := range h.events {
+               result = append(result, evt)
+       }
+       h.events = make(map[string]kvstore.Event)
+       return result
+}
+
+func (h *InstanceEventHandler) add(evt kvstore.Event, instance 
*pb.MicroServiceInstance) {
+       h.eventsLock.Lock()
+       defer h.eventsLock.Unlock()
+       _, ok := h.events[instance.ServiceId] // 每个服务仅保留一个事件
+       if !ok {
+               evt.Type = pb.EVT_UPDATE // 统一改为刷新事件,避免影响配额
+               h.events[instance.ServiceId] = evt
+       }
+}
+
+func (h *InstanceEventHandler) run() {
+       gopool.Go(func(ctx context.Context) {
+               for {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       case <-time.After(h.interval):
+                               if !h.healthChecker.ShouldTrustPeerServer() {
+                                       continue
+                               }
+                               // 对端SC恢复后,再重新处理事件
+                               events := h.exportAndClearEvents()
+                               h.handleAgain(events)
+                       }
+               }
+       })
+}
+func (h *InstanceEventHandler) handleAgain(events []kvstore.Event) {
+       for _, evt := range events {
+               h.handler.OnEvent(evt)
+       }
+}
+
+func NewInstanceEventHandler(healthChecker *HealthChecker, interval 
time.Duration, handler kvstore.EventHandler) *InstanceEventHandler {
+       h := &InstanceEventHandler{
+               handler:       handler,
+               events:        make(map[string]kvstore.Event),
+               eventsLock:    sync.Mutex{},
+               interval:      interval,
+               healthChecker: healthChecker,
+       }
+       h.run()
+       return h
+}
+
+func checkShouldIgnoreEvent(evt kvstore.Event) (storedInstance 
*pb.MicroServiceInstance, ignore bool) {
+       action := evt.Type
+       // 初始化是内部事件,忽略
+       // 本来就查不到,推了删除事件后还是查不到,因此删除事件也忽略
+       if action == pb.EVT_INIT || action == pb.EVT_DELETE {
+               return nil, true
+       }
+       instance, ok := evt.KV.Value.(*pb.MicroServiceInstance)
+       if !ok {
+               log.Error("failed to assert microServiceInstance", 
datasource.ErrAssertFail)
+               return nil, true
+       }
+       // 连接在本SC的实例,忽略
+       if util.IsMapFullyMatch(instance.Properties, 
disco.GetInnerProperties()) {
+               return nil, true
+       }
+       log.Info(fmt.Sprintf("caught [%s] service[%s] instance[%s] event, 
endpoints %v, will handle it again when peer sc recovery",
+               action, instance.ServiceId, instance.InstanceId, 
instance.Endpoints))
+       return instance, false
+}
diff --git a/syncer/service/admin/instance_event_handler_test.go 
b/syncer/service/admin/instance_event_handler_test.go
new file mode 100644
index 00000000..fd4ac0d1
--- /dev/null
+++ b/syncer/service/admin/instance_event_handler_test.go
@@ -0,0 +1,158 @@
+package admin
+
+import (
+       "sync"
+       "testing"
+       "time"
+
+       pb "github.com/go-chassis/cari/discovery"
+       "github.com/stretchr/testify/assert"
+
+       "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
+       
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
+)
+
+type mockEventHandler struct {
+       processedEvts []kvstore.Event
+}
+
+func (h *mockEventHandler) Type() kvstore.Type {
+       return sd.TypeInstance
+}
+
+func (h *mockEventHandler) OnEvent(evt kvstore.Event) {
+       h.processedEvts = append(h.processedEvts, evt)
+}
+
+func TestInstanceEventHandler_OnEvent(t *testing.T) {
+       h := NewInstanceEventHandler(&HealthChecker{
+               shouldTrustPeerServer: false,
+       }, 100*time.Millisecond, kvstore.EventHandler(nil))
+
+       evt := kvstore.Event{
+               Type: pb.EVT_CREATE,
+               KV: &kvstore.KeyValue{
+                       Value: &pb.MicroServiceInstance{
+                               InstanceId: "test_instnace",
+                               ServiceId:  "test_service",
+                       },
+               },
+       }
+       h.OnEvent(evt)
+       assert.Equal(t, 1, len(h.events))
+
+       // 信任对端SC,不处理
+       h.events = make(map[string]kvstore.Event)
+       h.healthChecker.shouldTrustPeerServer = true
+       h.OnEvent(evt)
+       assert.Equal(t, 0, len(h.events))
+
+       h.events = make(map[string]kvstore.Event)
+       // 忽略的事件,不处理
+       evt.Type = pb.EVT_INIT
+       h.healthChecker.shouldTrustPeerServer = false
+       h.OnEvent(evt)
+       assert.Equal(t, 0, len(h.events))
+
+       // 处理
+       evt.Type = pb.EVT_CREATE
+       h.OnEvent(evt)
+       assert.Equal(t, 1, len(h.events))
+}
+
+func Test_checkShouldIgnoreEvent(t *testing.T) {
+       instanceDef := &pb.MicroServiceInstance{
+               InstanceId: "test_instnace",
+               ServiceId:  "test_service",
+               Properties: map[string]string{
+                       "engineID":   "test_engineID_fake",
+                       "engineName": "test_engineName",
+               },
+       }
+
+       evt := kvstore.Event{
+               Type: pb.EVT_CREATE,
+               KV: &kvstore.KeyValue{
+                       Value: instanceDef,
+               },
+       }
+       instance, ignore := checkShouldIgnoreEvent(evt)
+       assert.False(t, ignore)
+       assert.Equal(t, instanceDef, instance)
+
+       evt.Type = pb.EVT_INIT
+       _, ignore = checkShouldIgnoreEvent(evt)
+       assert.True(t, ignore)
+       evt.Type = pb.EVT_DELETE
+       _, ignore = checkShouldIgnoreEvent(evt)
+       assert.True(t, ignore)
+       evt.Type = pb.EVT_UPDATE
+       _, ignore = checkShouldIgnoreEvent(evt)
+       assert.False(t, ignore)
+
+       evt.KV.Value = 1
+       _, ignore = checkShouldIgnoreEvent(evt)
+       assert.True(t, ignore)
+       evt.KV.Value = instanceDef
+       _, ignore = checkShouldIgnoreEvent(evt)
+       assert.False(t, ignore)
+
+       instanceDef.Properties["engineID"] = "test_engineID"
+       _, ignore = checkShouldIgnoreEvent(evt)
+       assert.True(t, ignore)
+       instanceDef.Properties["engineID"] = "test_engineID_fake"
+       _, ignore = checkShouldIgnoreEvent(evt)
+       assert.False(t, ignore)
+}
+
+func TestInstanceEventHandler_run(t *testing.T) {
+       mockHandler := &mockEventHandler{}
+       h := NewInstanceEventHandler(&HealthChecker{
+               shouldTrustPeerServer: false,
+       }, 100*time.Millisecond, mockHandler)
+
+       var wg sync.WaitGroup
+       numGoroutines := 50
+       for i := 0; i < numGoroutines; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       h.OnEvent(kvstore.Event{
+                               Type: pb.EVT_CREATE,
+                               KV: &kvstore.KeyValue{
+                                       Value: &pb.MicroServiceInstance{
+                                               InstanceId: "test_instance",
+                                               ServiceId:  "test_service",
+                                       },
+                               },
+                       })
+                       h.OnEvent(kvstore.Event{
+                               Type: pb.EVT_CREATE,
+                               KV: &kvstore.KeyValue{
+                                       Value: &pb.MicroServiceInstance{
+                                               InstanceId: "test_instance1",
+                                               ServiceId:  "test_service1",
+                                       },
+                               },
+                       })
+               }()
+       }
+       wg.Wait()
+       assert.Equal(t, 2, len(h.events)) // 一个服务多个事件仅保留一个
+       assert.Equal(t, 0, len(mockHandler.processedEvts))
+
+       time.Sleep(110 * time.Millisecond)
+       assert.Equal(t, 2, len(h.events))
+       assert.Equal(t, 0, len(mockHandler.processedEvts)) // 对端不可信,只保存,不处理
+
+       h.healthChecker.shouldTrustPeerServer = true
+       time.Sleep(110 * time.Millisecond)
+       assert.Equal(t, 0, len(h.events))                                  // 
对端可信,处理事件,事件处理完成清空
+       assert.Equal(t, 2, len(mockHandler.processedEvts))                 // 
处理事件
+       assert.ElementsMatch(t, []string{"test_service", "test_service1"}, // 
处理服务为设置的服务
+               
[]string{mockHandler.processedEvts[0].KV.Value.(*pb.MicroServiceInstance).ServiceId,
 mockHandler.processedEvts[1].KV.Value.(*pb.MicroServiceInstance).ServiceId})
+
+       for _, evt := range mockHandler.processedEvts {
+               assert.Equal(t, pb.EVT_UPDATE, evt.Type)
+       }
+}
diff --git a/syncer/service/admin/window.go b/syncer/service/admin/window.go
new file mode 100644
index 00000000..04c450c4
--- /dev/null
+++ b/syncer/service/admin/window.go
@@ -0,0 +1,32 @@
+package admin
+
+type HealthCheckWindow struct {
+       windowSize       int
+       checkPassResults []bool
+       failureThreshold int
+}
+
+func NewHealthCheckWindow(windowSize int, failureThreshold int) 
*HealthCheckWindow {
+       return &HealthCheckWindow{
+               windowSize:       windowSize,
+               checkPassResults: make([]bool, 0, windowSize),
+               failureThreshold: failureThreshold,
+       }
+}
+
+func (h *HealthCheckWindow) AddResult(pass bool) {
+       h.checkPassResults = append(h.checkPassResults, pass)
+       if len(h.checkPassResults) > h.windowSize {
+               h.checkPassResults = h.checkPassResults[1:]
+       }
+}
+
+func (h *HealthCheckWindow) IsHealthy() bool {
+       failureCount := 0
+       for _, pass := range h.checkPassResults {
+               if !pass {
+                       failureCount++
+               }
+       }
+       return failureCount < h.failureThreshold
+}
diff --git a/syncer/service/admin/window_test.go 
b/syncer/service/admin/window_test.go
new file mode 100644
index 00000000..3f700467
--- /dev/null
+++ b/syncer/service/admin/window_test.go
@@ -0,0 +1,71 @@
+package admin
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestHealthCheckWindow_AddResult(t *testing.T) {
+       c := NewHealthCheckWindow(5, 2)
+       for i := 0; i < 5; i++ {
+               c.AddResult(true)
+               assert.Equal(t, i+1, len(c.checkPassResults))
+       }
+       for i := 0; i < 5; i++ {
+               c.AddResult(true)
+               assert.Equal(t, c.windowSize, len(c.checkPassResults))
+       }
+       c.AddResult(false)
+       c.AddResult(false)
+       assert.ElementsMatch(t, []bool{true, true, true, false, false}, 
c.checkPassResults)
+}
+
+func TestHealthCheckWindow_IsHealthy(t *testing.T) {
+       c := NewHealthCheckWindow(4, 2)
+
+       // f 健康
+       c.AddResult(false)
+       assert.True(t, c.IsHealthy())
+
+       // 达到2次失败,不健康
+       for i := 0; i < 5; i++ {
+               c.AddResult(false)
+               assert.False(t, c.IsHealthy())
+       }
+       // 结束状态:f f f f
+
+       // f f f t 3次失败
+       c.AddResult(true)
+       assert.False(t, c.IsHealthy())
+
+       // f f t t 2次失败
+       c.AddResult(true)
+       assert.False(t, c.IsHealthy())
+
+       // f t t t 1次失败,变为健康
+       c.AddResult(true)
+       assert.True(t, c.IsHealthy())
+
+       // t t t f 1次失败
+       c.AddResult(false)
+       assert.True(t, c.IsHealthy())
+
+       // t t f f 2次失败,变为不健康
+       c.AddResult(false)
+       assert.False(t, c.IsHealthy())
+
+       // t f f t 2次失败
+       c.AddResult(true)
+       assert.False(t, c.IsHealthy())
+
+       // f f t t 2次失败
+       c.AddResult(true)
+       assert.False(t, c.IsHealthy())
+
+       // 小于2次失败,变为成功
+       for i := 0; i < 4; i++ {
+               c.AddResult(true)
+               assert.True(t, c.IsHealthy())
+       }
+}

Reply via email to