This is an automated email from the ASF dual-hosted git repository. robotljw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push: new 8476900 fix cache bugs new c2de051 Merge pull request #1047 from DFSOrange/319 8476900 is described below commit 8476900a587156c91efb46752a4c9eefc10dd0fc Author: DFSOrange <pqa1996...@163.com> AuthorDate: Wed Jun 16 09:28:24 2021 +0800 fix cache bugs --- datasource/mongo/event/instance_event_handler.go | 3 -- datasource/mongo/sd/dep_cache.go | 11 +++- datasource/mongo/sd/instance_cache.go | 13 ++++- datasource/mongo/sd/instancec_test.go | 66 ++++++++++++++++++++++++ datasource/mongo/sd/mongo_cacher.go | 5 ++ datasource/mongo/sd/rule_cache.go | 12 ++++- datasource/mongo/sd/service_cache.go | 11 +++- pkg/event/bus_service.go | 2 +- server/handler/metrics/metrics.go | 6 ++- 9 files changed, 120 insertions(+), 9 deletions(-) diff --git a/datasource/mongo/event/instance_event_handler.go b/datasource/mongo/event/instance_event_handler.go index 691e07d..4f2d57d 100644 --- a/datasource/mongo/event/instance_event_handler.go +++ b/datasource/mongo/event/instance_event_handler.go @@ -50,9 +50,6 @@ func (h InstanceEventHandler) Type() string { func (h InstanceEventHandler) OnEvent(evt sd.MongoEvent) { action := evt.Type - if evt.Type == discovery.EVT_UPDATE { - return - } instance, ok := evt.Value.(model.Instance) if !ok { log.Error("failed to assert instance", datasource.ErrAssertFail) diff --git a/datasource/mongo/sd/dep_cache.go b/datasource/mongo/sd/dep_cache.go index 124cbec..cda7aa4 100644 --- a/datasource/mongo/sd/dep_cache.go +++ b/datasource/mongo/sd/dep_cache.go @@ -18,6 +18,7 @@ package sd import ( + "reflect" "strings" "github.com/apache/servicecomb-service-center/datasource/mongo/client/model" @@ -123,7 +124,15 @@ func (s *depStore) ProcessDelete(event MongoEvent) { } func (s *depStore) isValueNotUpdated(value interface{}, newValue interface{}) bool { - return false + newDep, ok := newValue.(model.DependencyRule) + if !ok { + return true + } + oldDep, ok := value.(model.DependencyRule) + if !ok { + return true + } + return reflect.DeepEqual(newDep, oldDep) } func genDepServiceKey(dep model.DependencyRule) string { diff --git a/datasource/mongo/sd/instance_cache.go b/datasource/mongo/sd/instance_cache.go index db8d371..5072835 100644 --- a/datasource/mongo/sd/instance_cache.go +++ b/datasource/mongo/sd/instance_cache.go @@ -18,6 +18,8 @@ package sd import ( + "reflect" + "github.com/apache/servicecomb-service-center/datasource/mongo/client/model" "github.com/apache/servicecomb-service-center/datasource/sdcommon" rmodel "github.com/go-chassis/cari/discovery" @@ -120,7 +122,16 @@ func (s *instanceStore) ProcessDelete(event MongoEvent) { } func (s *instanceStore) isValueNotUpdated(value interface{}, newValue interface{}) bool { - return true + newInst, ok := newValue.(model.Instance) + if !ok { + return true + } + oldInst, ok := value.(model.Instance) + if !ok { + return true + } + newInst.RefreshTime = oldInst.RefreshTime + return reflect.DeepEqual(newInst, oldInst) } func genInstServiceID(inst model.Instance) string { diff --git a/datasource/mongo/sd/instancec_test.go b/datasource/mongo/sd/instancec_test.go index 9b2501d..29aaf40 100644 --- a/datasource/mongo/sd/instancec_test.go +++ b/datasource/mongo/sd/instancec_test.go @@ -21,6 +21,7 @@ package sd import ( "testing" + "time" "github.com/apache/servicecomb-service-center/datasource/mongo/client/model" "github.com/go-chassis/cari/discovery" @@ -83,3 +84,68 @@ func TestInstCacheBasicFunc(t *testing.T) { assert.Len(t, instanceCache.cache.GetValue("svcid"), 0) }) } + +func TestInstValueUpdate(t *testing.T) { + inst1 := model.Instance{ + Domain: "d1", + Project: "p1", + RefreshTime: time.Time{}, + Instance: &discovery.MicroServiceInstance{ + InstanceId: "123", + Version: "1.0", + }, + } + inst2 := model.Instance{ + Domain: "d1", + Project: "p1", + RefreshTime: time.Time{}, + Instance: &discovery.MicroServiceInstance{ + InstanceId: "123", + Version: "1.0", + }, + } + inst3 := model.Instance{ + Domain: "d2", + Project: "p2", + RefreshTime: time.Time{}, + Instance: &discovery.MicroServiceInstance{ + InstanceId: "123", + Version: "1.0", + }, + } + inst4 := model.Instance{ + Domain: "d2", + Project: "p2", + RefreshTime: time.Time{}, + Instance: &discovery.MicroServiceInstance{ + InstanceId: "123", + Version: "1.1", + }, + } + inst5 := model.Instance{ + Domain: "d2", + Project: "p2", + RefreshTime: time.Now(), + Instance: &discovery.MicroServiceInstance{ + InstanceId: "123", + Version: "1.1", + }, + } + + t.Run("given the same instances expect equal result", func(t *testing.T) { + res := instanceCache.cache.isValueNotUpdated(inst1, inst2) + assert.True(t, res) + }) + t.Run("given instances with the different domain expect not equal result", func(t *testing.T) { + res := instanceCache.cache.isValueNotUpdated(inst2, inst3) + assert.False(t, res) + }) + t.Run("given instances with the different version expect not equal result", func(t *testing.T) { + res := instanceCache.cache.isValueNotUpdated(inst3, inst4) + assert.False(t, res) + }) + t.Run("given instances with the different refresh time expect equal result", func(t *testing.T) { + res := instanceCache.cache.isValueNotUpdated(inst4, inst5) + assert.True(t, res) + }) +} diff --git a/datasource/mongo/sd/mongo_cacher.go b/datasource/mongo/sd/mongo_cacher.go index 986d15f..9a9c7b4 100644 --- a/datasource/mongo/sd/mongo_cacher.go +++ b/datasource/mongo/sd/mongo_cacher.go @@ -193,6 +193,10 @@ func (c *MongoCacher) handleEventBus(eventbus *sdcommon.EventBus) error { } for _, resource := range resp.Resources { + if resource.Value == nil { + log.Error(fmt.Sprintf("get nil value while watch for mongocache,the docID is %s", resource.Key), nil) + break + } action := resp.Action var event MongoEvent switch action { @@ -265,6 +269,7 @@ func (c *MongoCacher) filter(infos []*sdcommon.Resource) []MongoEvent { for block := range eventsCh { for _, e := range block { if e.Value == nil { + log.Error(fmt.Sprintf("get nil value while do list, the docID is %s", e.DocumentID), nil) break } events = append(events, e) diff --git a/datasource/mongo/sd/rule_cache.go b/datasource/mongo/sd/rule_cache.go index 8809eab..638256b 100644 --- a/datasource/mongo/sd/rule_cache.go +++ b/datasource/mongo/sd/rule_cache.go @@ -18,6 +18,8 @@ package sd import ( + "reflect" + "github.com/apache/servicecomb-service-center/datasource/mongo/client/model" "github.com/apache/servicecomb-service-center/datasource/sdcommon" "go.mongodb.org/mongo-driver/bson" @@ -115,7 +117,15 @@ func (s *ruleStore) ProcessDelete(event MongoEvent) { } func (s *ruleStore) isValueNotUpdated(value interface{}, newValue interface{}) bool { - return false + newRule, ok := newValue.(model.Rule) + if !ok { + return true + } + oldRule, ok := value.(model.Rule) + if !ok { + return true + } + return reflect.DeepEqual(newRule, oldRule) } func genRuleServiceID(rule model.Rule) string { diff --git a/datasource/mongo/sd/service_cache.go b/datasource/mongo/sd/service_cache.go index 23d4f13..03d1764 100644 --- a/datasource/mongo/sd/service_cache.go +++ b/datasource/mongo/sd/service_cache.go @@ -18,6 +18,7 @@ package sd import ( + "reflect" "strings" "github.com/apache/servicecomb-service-center/datasource/mongo/client/model" @@ -119,7 +120,15 @@ func (s *serviceStore) ProcessDelete(event MongoEvent) { } func (s *serviceStore) isValueNotUpdated(value interface{}, newValue interface{}) bool { - return false + newService, ok := newValue.(model.Service) + if !ok { + return true + } + oldService, ok := value.(model.Service) + if !ok { + return true + } + return reflect.DeepEqual(newService, oldService) } func genServiceID(svc model.Service) string { diff --git a/pkg/event/bus_service.go b/pkg/event/bus_service.go index 2b53597..812a241 100644 --- a/pkg/event/bus_service.go +++ b/pkg/event/bus_service.go @@ -128,7 +128,7 @@ func (s *BusService) Fire(evt Event) error { bus, ok := s.buses[evt.Type()] if !ok { s.mux.RUnlock() - return fmt.Errorf("unknown event type[%s]", evt.Type()) + return fmt.Errorf("no %s subscriber on this service center", evt.Type()) } s.mux.RUnlock() bus.Fire(evt) diff --git a/server/handler/metrics/metrics.go b/server/handler/metrics/metrics.go index 36dca1d..33c2bfa 100644 --- a/server/handler/metrics/metrics.go +++ b/server/handler/metrics/metrics.go @@ -21,6 +21,8 @@ import ( "net/http" "time" + "github.com/apache/servicecomb-service-center/server/config" + "github.com/apache/servicecomb-service-center/pkg/chain" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/rest" @@ -44,5 +46,7 @@ func (h *Handler) Handle(i *chain.Invocation) { } func RegisterHandlers() { - chain.RegisterHandler(rest.ServerChainName, &Handler{}) + if config.GetBool("metrics.enable", false) { + chain.RegisterHandler(rest.ServerChainName, &Handler{}) + } }