This is an automated email from the ASF dual-hosted git repository.
humingcheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/dev by this push:
new 775ee362 Do not return the instances from peer, if sync health check
failed. (#1503)
775ee362 is described below
commit 775ee3626fe402130a9cc96feb006e0a6d2b2f73
Author: humingcheng <[email protected]>
AuthorDate: Fri Mar 7 00:44:36 2025 +0800
Do not return the instances from peer, if sync health check failed. (#1503)
* Do not return the instances from peer, if sync health check failed.
---
datasource/etcd/cache/filter_instances.go | 11 +-
pkg/util/common.go | 15 +-
pkg/util/util.go | 16 +++
pkg/util/util_test.go | 83 +++++++++++
server/service/disco/instance.go | 8 +-
server/service/sync/sync.go | 5 +
syncer/service/admin/check.go | 82 +++++++++++
syncer/service/admin/check_test.go | 68 +++++++++
syncer/service/admin/health.go | 29 +++-
syncer/service/admin/instance_event_handler.go | 138 ++++++++++++++++++
.../service/admin/instance_event_handler_test.go | 158 +++++++++++++++++++++
syncer/service/admin/window.go | 32 +++++
syncer/service/admin/window_test.go | 71 +++++++++
13 files changed, 701 insertions(+), 15 deletions(-)
diff --git a/datasource/etcd/cache/filter_instances.go
b/datasource/etcd/cache/filter_instances.go
index 9b49c2c5..c2ed4288 100644
--- a/datasource/etcd/cache/filter_instances.go
+++ b/datasource/etcd/cache/filter_instances.go
@@ -29,6 +29,7 @@ import (
"github.com/apache/servicecomb-service-center/datasource/etcd/util"
"github.com/apache/servicecomb-service-center/pkg/cache"
"github.com/apache/servicecomb-service-center/pkg/log"
+ util2 "github.com/apache/servicecomb-service-center/pkg/util"
)
type InstancesFilter struct {
@@ -91,14 +92,22 @@ func (f *InstancesFilter) findInstances(ctx
context.Context, domainProject, serv
return
}
+ requiredProperties, _ :=
ctx.Value(util2.CtxRequiredInstancePropertiesOnDisco).(map[string]string)
for _, kv := range resp.Kvs {
+ inst := kv.Value.(*pb.MicroServiceInstance)
+ if inst == nil {
+ continue
+ }
+ if !util2.IsMapFullyMatch(inst.Properties, requiredProperties) {
+ continue
+ }
if i, ok := getOrCreateClustersIndex()[kv.ClusterName]; ok {
if kv.ModRevision > maxRevs[i] {
maxRevs[i] = kv.ModRevision
}
counts[i]++
}
- instances = append(instances,
kv.Value.(*pb.MicroServiceInstance))
+ instances = append(instances, inst)
}
return
}
diff --git a/pkg/util/common.go b/pkg/util/common.go
index b9661f9a..3de7dcb9 100644
--- a/pkg/util/common.go
+++ b/pkg/util/common.go
@@ -22,13 +22,14 @@ import "os"
type CtxKey string
const (
- HeaderRev = "X-Resource-Revision"
- CtxGlobal CtxKey = "global"
- CtxNocache CtxKey = "noCache"
- CtxCacheOnly CtxKey = "cacheOnly"
- CtxRequestRevision CtxKey = "requestRev"
- CtxResponseRevision CtxKey = "responseRev"
- CtxEnableSync CtxKey = "enableSync"
+ HeaderRev = "X-Resource-Revision"
+ CtxGlobal CtxKey = "global"
+ CtxNocache CtxKey = "noCache"
+ CtxCacheOnly CtxKey = "cacheOnly"
+ CtxRequestRevision CtxKey = "requestRev"
+ CtxResponseRevision CtxKey = "responseRev"
+ CtxEnableSync CtxKey = "enableSync"
+ CtxRequiredInstancePropertiesOnDisco CtxKey =
"requiredInstancePropertiesOnDisco"
)
func GetAppRoot() string {
diff --git a/pkg/util/util.go b/pkg/util/util.go
index 52551869..391a32fb 100644
--- a/pkg/util/util.go
+++ b/pkg/util/util.go
@@ -194,3 +194,19 @@ func GeneratePassword() (string, error) {
}
return pass, nil
}
+
+func IsMapFullyMatch(source, required map[string]string) bool {
+ if len(required) == 0 {
+ return true
+ }
+ if len(source) < len(required) {
+ return false
+ }
+
+ for key, value := range required {
+ if resourceValue, exists := source[key]; !exists ||
resourceValue != value {
+ return false
+ }
+ }
+ return true
+}
diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go
index 4950ea5b..56b551f7 100644
--- a/pkg/util/util_test.go
+++ b/pkg/util/util_test.go
@@ -184,3 +184,86 @@ func TestGeneratePassword(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 8, len(password), password)
}
+
+func TestIsMapFullyMatch(t *testing.T) {
+ type args struct {
+ resource map[string]string
+ required map[string]string
+ }
+ tests := []struct {
+ name string
+ args args
+ want bool
+ }{
+ {
+ name: "source/required nil",
+ args: args{},
+ want: true,
+ },
+ {
+ name: "required nil",
+ args: args{
+ resource: map[string]string{
+ "k1": "v1",
+ },
+ required: nil,
+ },
+ want: true,
+ },
+ {
+ name: "source nil",
+ args: args{
+ resource: nil,
+ required: map[string]string{
+ "k1": "v1",
+ },
+ },
+ want: false,
+ },
+ {
+ name: "source is larger",
+ args: args{
+ resource: map[string]string{
+ "k1": "v1",
+ "k2": "v2",
+ },
+ required: map[string]string{
+ "k1": "v1",
+ },
+ },
+ want: true,
+ },
+ {
+ name: "source is smaller",
+ args: args{
+ resource: map[string]string{
+ "k1": "v1",
+ },
+ required: map[string]string{
+ "k1": "v1",
+ "k2": "v2",
+ },
+ },
+ want: false,
+ },
+ {
+ name: "source equals required",
+ args: args{
+ resource: map[string]string{
+ "k1": "v1",
+ "k2": "v2",
+ },
+ required: map[string]string{
+ "k1": "v1",
+ "k2": "v2",
+ },
+ },
+ want: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ assert.Equalf(t, tt.want,
IsMapFullyMatch(tt.args.resource, tt.args.required), "IsMapFullyMatch(%v, %v)",
tt.args.resource, tt.args.required)
+ })
+ }
+}
diff --git a/server/service/disco/instance.go b/server/service/disco/instance.go
index 27ff878d..c232b655 100644
--- a/server/service/disco/instance.go
+++ b/server/service/disco/instance.go
@@ -51,7 +51,7 @@ var (
propertiesMap map[string]string
)
-func getInnerProperties() map[string]string {
+func GetInnerProperties() map[string]string {
once.Do(func() {
propertiesMap =
config.GetStringMap("registry.instance.properties")
})
@@ -144,7 +144,7 @@ func appendInnerPropertiesToInstance(instance
*pb.MicroServiceInstance) {
instance.Properties = make(map[string]string)
}
- innerProps := getInnerProperties()
+ innerProps := GetInnerProperties()
if len(innerProps) <= 0 {
return
}
@@ -215,7 +215,7 @@ func appendInnerProperties(ctx context.Context, serviceID
string, instanceID str
func shouldAppendInnerProperties(instance *pb.MicroServiceInstance) bool {
instProps := instance.Properties
- innerProps := getInnerProperties()
+ innerProps := GetInnerProperties()
if len(innerProps) == 0 {
return false
}
@@ -457,7 +457,7 @@ func PutInstanceProperties(ctx context.Context, in
*pb.UpdateInstancePropsReques
return pb.NewError(pb.ErrInvalidParams, err.Error())
}
- properties := getInnerProperties()
+ properties := GetInnerProperties()
if in.Properties == nil {
in.Properties = make(map[string]string, len(properties))
}
diff --git a/server/service/sync/sync.go b/server/service/sync/sync.go
index 5b6f26e0..e327c635 100644
--- a/server/service/sync/sync.go
+++ b/server/service/sync/sync.go
@@ -23,6 +23,8 @@ import (
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/config"
+ "github.com/apache/servicecomb-service-center/server/service/disco"
+ "github.com/apache/servicecomb-service-center/syncer/service/admin"
)
var (
@@ -40,6 +42,9 @@ func Enable() bool {
}
func SetContext(ctx context.Context) context.Context {
+ if Enable() && !admin.ShouldTrustPeerServer() { //
不信任对端SC,则服务发现时,只保留在本SC注册的实例
+ util.SetContext(ctx, util.CtxRequiredInstancePropertiesOnDisco,
disco.GetInnerProperties())
+ }
var val string
if Enable() {
val = "1"
diff --git a/syncer/service/admin/check.go b/syncer/service/admin/check.go
new file mode 100644
index 00000000..3934631a
--- /dev/null
+++ b/syncer/service/admin/check.go
@@ -0,0 +1,82 @@
+package admin
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/go-chassis/foundation/gopool"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/syncer/rpc"
+)
+
+func ShouldTrustPeerServer() bool {
+ return globalHealthChecker.ShouldTrustPeerServer()
+}
+
+type HealthChecker struct {
+ checkIntervalBySecond uint
+ latestCheckResult *Resp
+ latestCheckErr error
+
+ // 为了容忍网络抖动,使用滑动窗口判断状态
+ failureWindow *HealthCheckWindow
+ // 恢复窗口,成功率要求更高,用于数据同步的恢复
+ recoveryWindow *HealthCheckWindow
+ shouldTrustPeerServer bool
+}
+
+func (h *HealthChecker) check() {
+ h.latestCheckResult, h.latestCheckErr = checkPeerStatus()
+ if h.latestCheckErr != nil || h.latestCheckResult == nil ||
len(h.latestCheckResult.Peers) == 0 {
+ h.AddResult(false)
+ return
+ }
+ if h.latestCheckResult.Peers[0].Status != rpc.HealthStatusConnected {
+ h.AddResult(false)
+ return
+ }
+ h.AddResult(true)
+}
+
+func (h *HealthChecker) ShouldTrustPeerServer() bool {
+ return h.shouldTrustPeerServer
+}
+
+func (h *HealthChecker) AddResult(pass bool) {
+ h.failureWindow.AddResult(pass)
+ h.recoveryWindow.AddResult(pass)
+
+ shouldTrustPeerServerNew := true
+ if h.shouldTrustPeerServer {
+ // 健康 > 不健康
+ shouldTrustPeerServerNew = h.failureWindow.IsHealthy()
+ } else {
+ // 不健康 > 健康
+ shouldTrustPeerServerNew = h.recoveryWindow.IsHealthy()
+ }
+ if h.shouldTrustPeerServer != shouldTrustPeerServerNew {
+ log.Info(fmt.Sprintf("should trust peer server changed, old:
%v, new: %v", h.shouldTrustPeerServer, shouldTrustPeerServerNew))
+ h.shouldTrustPeerServer = shouldTrustPeerServerNew
+ return
+ }
+}
+
+func (h *HealthChecker) LatestHealthCheckResult() (*Resp, error) {
+ return h.latestCheckResult, h.latestCheckErr
+}
+
+func (h *HealthChecker) RunChecker() {
+ h.check()
+ gopool.Go(func(ctx context.Context) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case
<-time.After(time.Duration(h.checkIntervalBySecond) * time.Second):
+ h.check()
+ }
+ }
+ })
+}
diff --git a/syncer/service/admin/check_test.go
b/syncer/service/admin/check_test.go
new file mode 100644
index 00000000..f1a22dc9
--- /dev/null
+++ b/syncer/service/admin/check_test.go
@@ -0,0 +1,68 @@
+package admin
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestHealthChecker_AddResult(t *testing.T) {
+ h := &HealthChecker{
+ checkIntervalBySecond: 1,
+ failureWindow: NewHealthCheckWindow(8, 5),
+ recoveryWindow: NewHealthCheckWindow(4, 2),
+ shouldTrustPeerServer: true,
+ }
+ // 全部true
+ for i := 0; i < 10; i++ {
+ h.AddResult(true)
+ assert.True(t, h.failureWindow.IsHealthy())
+ assert.True(t, h.recoveryWindow.IsHealthy())
+ assert.True(t, h.ShouldTrustPeerServer())
+ }
+
+ // t t t t f f f f
+ h.AddResult(false)
+ h.AddResult(false)
+ h.AddResult(false)
+ h.AddResult(false)
+ assert.True(t, h.failureWindow.IsHealthy())
+ assert.False(t, h.recoveryWindow.IsHealthy()) // recoveryWindow 首先变成失败
+ assert.True(t, h.ShouldTrustPeerServer()) // 结果还是健康,因为 健康 > 不健康,看
failureWindow
+
+ // t t t f f f f f
+ h.AddResult(false)
+ assert.False(t, h.failureWindow.IsHealthy())
+ assert.False(t, h.recoveryWindow.IsHealthy())
+ assert.False(t, h.ShouldTrustPeerServer()) // 不健康
+
+ // 全false
+ for i := 0; i < 10; i++ {
+ h.AddResult(false)
+ assert.False(t, h.failureWindow.IsHealthy())
+ assert.False(t, h.recoveryWindow.IsHealthy())
+ assert.False(t, h.ShouldTrustPeerServer())
+ }
+ assert.ElementsMatch(t, []bool{false, false, false, false, false,
false, false, false}, h.failureWindow.checkPassResults)
+ assert.ElementsMatch(t, []bool{false, false, false, false},
h.recoveryWindow.checkPassResults)
+
+ h.AddResult(true)
+ h.AddResult(false)
+ h.AddResult(true)
+ h.AddResult(false)
+ h.AddResult(true)
+ h.AddResult(false)
+ h.AddResult(true)
+ h.AddResult(false)
+ assert.ElementsMatch(t, []bool{true, false, true, false, true, false,
true, false}, h.failureWindow.checkPassResults)
+ assert.ElementsMatch(t, []bool{true, false, true, false},
h.recoveryWindow.checkPassResults)
+ assert.True(t, h.failureWindow.IsHealthy()) // failureWindow 恢复
+ assert.False(t, h.recoveryWindow.IsHealthy()) // 但是 recoveryWindow 还没恢复
+ assert.False(t, h.ShouldTrustPeerServer()) // 结果是不健康,因为不健康 > 健康,看
recoveryWindow
+
+ h.AddResult(true)
+ h.AddResult(true)
+ assert.True(t, h.failureWindow.IsHealthy())
+ assert.True(t, h.recoveryWindow.IsHealthy())
+ assert.True(t, h.ShouldTrustPeerServer()) // 结果健康
+}
diff --git a/syncer/service/admin/health.go b/syncer/service/admin/health.go
index c47ae420..134ac1a5 100644
--- a/syncer/service/admin/health.go
+++ b/syncer/service/admin/health.go
@@ -30,6 +30,8 @@ import (
"google.golang.org/grpc/status"
"github.com/apache/servicecomb-service-center/client"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/event"
+
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
"github.com/apache/servicecomb-service-center/pkg/log"
pkgrpc "github.com/apache/servicecomb-service-center/pkg/rpc"
"github.com/apache/servicecomb-service-center/server/plugin/security/cipher"
@@ -43,11 +45,16 @@ import (
const (
scheme = "grpc"
serviceName = "syncer"
+
+ // eventPushCheckIntervalBySecond is suggested to be
checkPeerHealthIntervalBySecond*1/3
+ checkPeerHealthIntervalBySecond = 15
+ eventPushCheckIntervalBySecond = 5
)
var (
- peerInfos []*PeerInfo
- ErrConfigIsEmpty = errors.New("sync config is empty")
+ peerInfos []*PeerInfo
+ ErrConfigIsEmpty = errors.New("sync config is empty")
+ globalHealthChecker *HealthChecker
)
type Resp struct {
@@ -98,9 +105,21 @@ func Init() {
}
peerInfos = append(peerInfos, &PeerInfo{Peer: p, ClientConn:
conn})
}
+
+ globalHealthChecker = &HealthChecker{
+ checkIntervalBySecond: checkPeerHealthIntervalBySecond,
+ // 失败阈值,最近8次检查,5次失败即视为不健康,120s。
+ failureWindow: NewHealthCheckWindow(8, 5),
+ // 恢复阈值,最近6次检查,最多2次失败即视为不健康,即最多1次失败,90s。
+ recoveryWindow: NewHealthCheckWindow(6, 2),
+ shouldTrustPeerServer: true, //
默认信任对端,只有通过检查,确认对端无法连接,即两个SC同步异常,才认为对端数据不可信任
+ }
+ globalHealthChecker.RunChecker()
+ h := NewInstanceEventHandler(globalHealthChecker,
eventPushCheckIntervalBySecond*time.Second, event.NewInstanceEventHandler())
+ kvstore.AddEventHandler(h)
}
-func Health() (*Resp, error) {
+func checkPeerStatus() (*Resp, error) {
if len(peerInfos) <= 0 {
return nil, ErrConfigIsEmpty
}
@@ -124,6 +143,10 @@ func Health() (*Resp, error) {
return resp, nil
}
+func Health() (*Resp, error) {
+ return checkPeerStatus()
+}
+
func getPeerStatus(peerInfo *PeerInfo) string {
if peerInfo.ClientConn == nil {
log.Warn("clientConn is nil")
diff --git a/syncer/service/admin/instance_event_handler.go
b/syncer/service/admin/instance_event_handler.go
new file mode 100644
index 00000000..928f055a
--- /dev/null
+++ b/syncer/service/admin/instance_event_handler.go
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package admin
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/go-chassis/foundation/gopool"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
+
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/service/disco"
+)
+
+// InstanceEventHandler 对端SC同步异常期间,无法查询到在对端SC注册的实例,因此这期间对端SC的实例的事件,对客户端而言是无效事件,
+// 需要在对端SC同步恢复后,处理一次
+type InstanceEventHandler struct {
+ handler kvstore.EventHandler
+ interval time.Duration
+ healthChecker *HealthChecker
+
+ events map[string]kvstore.Event
+ eventsLock sync.Mutex
+}
+
+func (h *InstanceEventHandler) Type() kvstore.Type {
+ return sd.TypeInstance
+}
+
+func (h *InstanceEventHandler) OnEvent(evt kvstore.Event) {
+ if h.healthChecker.ShouldTrustPeerServer() {
+ return
+ }
+ instance, ignore := checkShouldIgnoreEvent(evt)
+ if ignore {
+ return
+ }
+ h.add(evt, instance)
+}
+
+func (h *InstanceEventHandler) exportAndClearEvents() []kvstore.Event {
+ h.eventsLock.Lock()
+ defer h.eventsLock.Unlock()
+ result := make([]kvstore.Event, 0, len(h.events))
+ for _, evt := range h.events {
+ result = append(result, evt)
+ }
+ h.events = make(map[string]kvstore.Event)
+ return result
+}
+
+func (h *InstanceEventHandler) add(evt kvstore.Event, instance
*pb.MicroServiceInstance) {
+ h.eventsLock.Lock()
+ defer h.eventsLock.Unlock()
+ _, ok := h.events[instance.ServiceId] // 每个服务仅保留一个事件
+ if !ok {
+ evt.Type = pb.EVT_UPDATE // 统一改为刷新事件,避免影响配额
+ h.events[instance.ServiceId] = evt
+ }
+}
+
+func (h *InstanceEventHandler) run() {
+ gopool.Go(func(ctx context.Context) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(h.interval):
+ if !h.healthChecker.ShouldTrustPeerServer() {
+ continue
+ }
+ // 对端SC恢复后,再重新处理事件
+ events := h.exportAndClearEvents()
+ h.handleAgain(events)
+ }
+ }
+ })
+}
+func (h *InstanceEventHandler) handleAgain(events []kvstore.Event) {
+ for _, evt := range events {
+ h.handler.OnEvent(evt)
+ }
+}
+
+func NewInstanceEventHandler(healthChecker *HealthChecker, interval
time.Duration, handler kvstore.EventHandler) *InstanceEventHandler {
+ h := &InstanceEventHandler{
+ handler: handler,
+ events: make(map[string]kvstore.Event),
+ eventsLock: sync.Mutex{},
+ interval: interval,
+ healthChecker: healthChecker,
+ }
+ h.run()
+ return h
+}
+
+func checkShouldIgnoreEvent(evt kvstore.Event) (storedInstance
*pb.MicroServiceInstance, ignore bool) {
+ action := evt.Type
+ // 初始化是内部事件,忽略
+ // 本来就查不到,推了删除事件后还是查不到,因此删除事件也忽略
+ if action == pb.EVT_INIT || action == pb.EVT_DELETE {
+ return nil, true
+ }
+ instance, ok := evt.KV.Value.(*pb.MicroServiceInstance)
+ if !ok {
+ log.Error("failed to assert microServiceInstance",
datasource.ErrAssertFail)
+ return nil, true
+ }
+ // 连接在本SC的实例,忽略
+ if util.IsMapFullyMatch(instance.Properties,
disco.GetInnerProperties()) {
+ return nil, true
+ }
+ log.Info(fmt.Sprintf("caught [%s] service[%s] instance[%s] event,
endpoints %v, will handle it again when peer sc recovery",
+ action, instance.ServiceId, instance.InstanceId,
instance.Endpoints))
+ return instance, false
+}
diff --git a/syncer/service/admin/instance_event_handler_test.go
b/syncer/service/admin/instance_event_handler_test.go
new file mode 100644
index 00000000..fd4ac0d1
--- /dev/null
+++ b/syncer/service/admin/instance_event_handler_test.go
@@ -0,0 +1,158 @@
+package admin
+
+import (
+ "sync"
+ "testing"
+ "time"
+
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
+
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
+)
+
+type mockEventHandler struct {
+ processedEvts []kvstore.Event
+}
+
+func (h *mockEventHandler) Type() kvstore.Type {
+ return sd.TypeInstance
+}
+
+func (h *mockEventHandler) OnEvent(evt kvstore.Event) {
+ h.processedEvts = append(h.processedEvts, evt)
+}
+
+func TestInstanceEventHandler_OnEvent(t *testing.T) {
+ h := NewInstanceEventHandler(&HealthChecker{
+ shouldTrustPeerServer: false,
+ }, 100*time.Millisecond, kvstore.EventHandler(nil))
+
+ evt := kvstore.Event{
+ Type: pb.EVT_CREATE,
+ KV: &kvstore.KeyValue{
+ Value: &pb.MicroServiceInstance{
+ InstanceId: "test_instnace",
+ ServiceId: "test_service",
+ },
+ },
+ }
+ h.OnEvent(evt)
+ assert.Equal(t, 1, len(h.events))
+
+ // 信任对端SC,不处理
+ h.events = make(map[string]kvstore.Event)
+ h.healthChecker.shouldTrustPeerServer = true
+ h.OnEvent(evt)
+ assert.Equal(t, 0, len(h.events))
+
+ h.events = make(map[string]kvstore.Event)
+ // 忽略的事件,不处理
+ evt.Type = pb.EVT_INIT
+ h.healthChecker.shouldTrustPeerServer = false
+ h.OnEvent(evt)
+ assert.Equal(t, 0, len(h.events))
+
+ // 处理
+ evt.Type = pb.EVT_CREATE
+ h.OnEvent(evt)
+ assert.Equal(t, 1, len(h.events))
+}
+
+func Test_checkShouldIgnoreEvent(t *testing.T) {
+ instanceDef := &pb.MicroServiceInstance{
+ InstanceId: "test_instnace",
+ ServiceId: "test_service",
+ Properties: map[string]string{
+ "engineID": "test_engineID_fake",
+ "engineName": "test_engineName",
+ },
+ }
+
+ evt := kvstore.Event{
+ Type: pb.EVT_CREATE,
+ KV: &kvstore.KeyValue{
+ Value: instanceDef,
+ },
+ }
+ instance, ignore := checkShouldIgnoreEvent(evt)
+ assert.False(t, ignore)
+ assert.Equal(t, instanceDef, instance)
+
+ evt.Type = pb.EVT_INIT
+ _, ignore = checkShouldIgnoreEvent(evt)
+ assert.True(t, ignore)
+ evt.Type = pb.EVT_DELETE
+ _, ignore = checkShouldIgnoreEvent(evt)
+ assert.True(t, ignore)
+ evt.Type = pb.EVT_UPDATE
+ _, ignore = checkShouldIgnoreEvent(evt)
+ assert.False(t, ignore)
+
+ evt.KV.Value = 1
+ _, ignore = checkShouldIgnoreEvent(evt)
+ assert.True(t, ignore)
+ evt.KV.Value = instanceDef
+ _, ignore = checkShouldIgnoreEvent(evt)
+ assert.False(t, ignore)
+
+ instanceDef.Properties["engineID"] = "test_engineID"
+ _, ignore = checkShouldIgnoreEvent(evt)
+ assert.True(t, ignore)
+ instanceDef.Properties["engineID"] = "test_engineID_fake"
+ _, ignore = checkShouldIgnoreEvent(evt)
+ assert.False(t, ignore)
+}
+
+func TestInstanceEventHandler_run(t *testing.T) {
+ mockHandler := &mockEventHandler{}
+ h := NewInstanceEventHandler(&HealthChecker{
+ shouldTrustPeerServer: false,
+ }, 100*time.Millisecond, mockHandler)
+
+ var wg sync.WaitGroup
+ numGoroutines := 50
+ for i := 0; i < numGoroutines; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ h.OnEvent(kvstore.Event{
+ Type: pb.EVT_CREATE,
+ KV: &kvstore.KeyValue{
+ Value: &pb.MicroServiceInstance{
+ InstanceId: "test_instance",
+ ServiceId: "test_service",
+ },
+ },
+ })
+ h.OnEvent(kvstore.Event{
+ Type: pb.EVT_CREATE,
+ KV: &kvstore.KeyValue{
+ Value: &pb.MicroServiceInstance{
+ InstanceId: "test_instance1",
+ ServiceId: "test_service1",
+ },
+ },
+ })
+ }()
+ }
+ wg.Wait()
+ assert.Equal(t, 2, len(h.events)) // 一个服务多个事件仅保留一个
+ assert.Equal(t, 0, len(mockHandler.processedEvts))
+
+ time.Sleep(110 * time.Millisecond)
+ assert.Equal(t, 2, len(h.events))
+ assert.Equal(t, 0, len(mockHandler.processedEvts)) // 对端不可信,只保存,不处理
+
+ h.healthChecker.shouldTrustPeerServer = true
+ time.Sleep(110 * time.Millisecond)
+ assert.Equal(t, 0, len(h.events)) //
对端可信,处理事件,事件处理完成清空
+ assert.Equal(t, 2, len(mockHandler.processedEvts)) //
处理事件
+ assert.ElementsMatch(t, []string{"test_service", "test_service1"}, //
处理服务为设置的服务
+
[]string{mockHandler.processedEvts[0].KV.Value.(*pb.MicroServiceInstance).ServiceId,
mockHandler.processedEvts[1].KV.Value.(*pb.MicroServiceInstance).ServiceId})
+
+ for _, evt := range mockHandler.processedEvts {
+ assert.Equal(t, pb.EVT_UPDATE, evt.Type)
+ }
+}
diff --git a/syncer/service/admin/window.go b/syncer/service/admin/window.go
new file mode 100644
index 00000000..04c450c4
--- /dev/null
+++ b/syncer/service/admin/window.go
@@ -0,0 +1,32 @@
+package admin
+
+type HealthCheckWindow struct {
+ windowSize int
+ checkPassResults []bool
+ failureThreshold int
+}
+
+func NewHealthCheckWindow(windowSize int, failureThreshold int)
*HealthCheckWindow {
+ return &HealthCheckWindow{
+ windowSize: windowSize,
+ checkPassResults: make([]bool, 0, windowSize),
+ failureThreshold: failureThreshold,
+ }
+}
+
+func (h *HealthCheckWindow) AddResult(pass bool) {
+ h.checkPassResults = append(h.checkPassResults, pass)
+ if len(h.checkPassResults) > h.windowSize {
+ h.checkPassResults = h.checkPassResults[1:]
+ }
+}
+
+func (h *HealthCheckWindow) IsHealthy() bool {
+ failureCount := 0
+ for _, pass := range h.checkPassResults {
+ if !pass {
+ failureCount++
+ }
+ }
+ return failureCount < h.failureThreshold
+}
diff --git a/syncer/service/admin/window_test.go
b/syncer/service/admin/window_test.go
new file mode 100644
index 00000000..3f700467
--- /dev/null
+++ b/syncer/service/admin/window_test.go
@@ -0,0 +1,71 @@
+package admin
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestHealthCheckWindow_AddResult(t *testing.T) {
+ c := NewHealthCheckWindow(5, 2)
+ for i := 0; i < 5; i++ {
+ c.AddResult(true)
+ assert.Equal(t, i+1, len(c.checkPassResults))
+ }
+ for i := 0; i < 5; i++ {
+ c.AddResult(true)
+ assert.Equal(t, c.windowSize, len(c.checkPassResults))
+ }
+ c.AddResult(false)
+ c.AddResult(false)
+ assert.ElementsMatch(t, []bool{true, true, true, false, false},
c.checkPassResults)
+}
+
+func TestHealthCheckWindow_IsHealthy(t *testing.T) {
+ c := NewHealthCheckWindow(4, 2)
+
+ // f 健康
+ c.AddResult(false)
+ assert.True(t, c.IsHealthy())
+
+ // 达到2次失败,不健康
+ for i := 0; i < 5; i++ {
+ c.AddResult(false)
+ assert.False(t, c.IsHealthy())
+ }
+ // 结束状态:f f f f
+
+ // f f f t 3次失败
+ c.AddResult(true)
+ assert.False(t, c.IsHealthy())
+
+ // f f t t 2次失败
+ c.AddResult(true)
+ assert.False(t, c.IsHealthy())
+
+ // f t t t 1次失败,变为健康
+ c.AddResult(true)
+ assert.True(t, c.IsHealthy())
+
+ // t t t f 1次失败
+ c.AddResult(false)
+ assert.True(t, c.IsHealthy())
+
+ // t t f f 2次失败,变为不健康
+ c.AddResult(false)
+ assert.False(t, c.IsHealthy())
+
+ // t f f t 2次失败
+ c.AddResult(true)
+ assert.False(t, c.IsHealthy())
+
+ // f f t t 2次失败
+ c.AddResult(true)
+ assert.False(t, c.IsHealthy())
+
+ // 小于2次失败,变为成功
+ for i := 0; i < 4; i++ {
+ c.AddResult(true)
+ assert.True(t, c.IsHealthy())
+ }
+}