This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new fcde10e restructure cache (#911)
fcde10e is described below
commit fcde10e1fe3598666cabe28353922c8753568895
Author: panqian <[email protected]>
AuthorDate: Fri Apr 16 10:36:32 2021 +0800
restructure cache (#911)
---
datasource/cache/ms_cache.go | 147 ++++++++++
datasource/mongo/client/dao/dep.go | 74 +++++
datasource/mongo/client/dao/instance.go | 9 +
datasource/mongo/client/dao/microservice.go | 46 ++-
datasource/mongo/client/dao/rule.go | 8 +
datasource/mongo/dep_util.go | 42 +--
datasource/mongo/dependency_query.go | 4 +-
datasource/mongo/event/instance_event_handler.go | 46 ++-
.../mongo/event/instance_event_handler_test.go | 2 -
datasource/mongo/ms.go | 149 ++++------
datasource/mongo/rule_util.go | 12 +-
datasource/mongo/sd/dep_cache.go | 132 +++++++++
datasource/mongo/sd/depcache_test.go | 93 ++++++
datasource/mongo/sd/{cache.go => doc_cache.go} | 50 ++--
.../mongo/sd/{options_test.go => docc_test.go} | 51 ++--
datasource/mongo/sd/event_proxy_test.go | 1 -
datasource/mongo/sd/{cache.go => hset.go} | 62 ++--
.../mongo/sd/{options_test.go => hset_test.go} | 47 +--
datasource/mongo/sd/index_cache.go | 71 +++++
datasource/mongo/sd/indexc_test.go | 21 ++
datasource/mongo/sd/instance_cache.go | 129 +++++++++
datasource/mongo/sd/instancec_test.go | 85 ++++++
datasource/mongo/sd/listwatch_inner.go | 58 +---
datasource/mongo/sd/listwatch_test.go | 27 +-
datasource/mongo/sd/mongo_cache.go | 177 ++----------
datasource/mongo/sd/mongo_cacher.go | 83 ++----
datasource/mongo/sd/mongo_cacher_test.go | 317 ---------------------
datasource/mongo/sd/options_test.go | 1 -
datasource/mongo/sd/rule_cache.go | 124 ++++++++
datasource/mongo/sd/service_cache.go | 133 +++++++++
datasource/mongo/sd/servicec_test.go | 94 ++++++
datasource/mongo/sd/types.go | 29 +-
datasource/mongo/sd/typestore.go | 25 +-
datasource/sdcommon/types.go | 2 -
34 files changed, 1475 insertions(+), 876 deletions(-)
diff --git a/datasource/cache/ms_cache.go b/datasource/cache/ms_cache.go
new file mode 100644
index 0000000..fa51f0c
--- /dev/null
+++ b/datasource/cache/ms_cache.go
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except request compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to request writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cache
+
+import (
+ "context"
+
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/go-chassis/cari/discovery"
+ "strings"
+)
+
+const (
+ Provider = "p"
+)
+
+func GetProviderServiceOfDeps(provider *discovery.MicroService)
(*discovery.MicroServiceDependency, bool) {
+ res := sd.Store().Dep().Cache().GetValue(genDepserivceKey(Provider,
provider))
+ deps, ok := transCacheToDep(res)
+ if !ok {
+ return nil, false
+ }
+ return deps[0], true
+}
+
+func transCacheToDep(cache []interface{})
([]*discovery.MicroServiceDependency, bool) {
+ res := make([]*discovery.MicroServiceDependency, 0, len(cache))
+ for _, v := range cache {
+ t, ok := v.(model.DependencyRule)
+ if !ok {
+ return nil, false
+ }
+ res = append(res, t.Dep)
+ }
+ if len(res) == 0 {
+ return nil, false
+ }
+ return res, true
+}
+
+func genDepserivceKey(ruleType string, service *discovery.MicroService) string
{
+ return strings.Join([]string{ruleType, service.AppId,
service.ServiceName, service.Version}, "/")
+}
+
+func GetMicroServiceInstancesByID(serviceID string)
([]*discovery.MicroServiceInstance, bool) {
+ cacheInstances := sd.Store().Instance().Cache().GetValue(serviceID)
+ insts, ok := transCacheToInsts(cacheInstances)
+ if !ok {
+ return nil, false
+ }
+ return insts, true
+}
+
+func transCacheToInsts(cache []interface{})
([]*discovery.MicroServiceInstance, bool) {
+ res := make([]*discovery.MicroServiceInstance, 0, len(cache))
+ for _, iter := range cache {
+ inst, ok := iter.(model.Instance)
+ if !ok {
+ return nil, false
+ }
+ res = append(res, inst.Instance)
+ }
+ if len(res) == 0 {
+ return nil, false
+ }
+ return res, true
+}
+
+func GetRulesByServiceID(serviceID string) ([]*model.Rule, bool){
+ cacheRes := sd.Store().Rule().Cache().GetValue(serviceID)
+ return transCacheToRules(cacheRes)
+}
+
+func transCacheToRules(cacheRules []interface{}) ([]*model.Rule, bool) {
+ res := make([]*model.Rule, 0, len(cacheRules))
+ for _, v := range cacheRules {
+ t, ok := v.(model.Rule)
+ if !ok {
+ return nil, false
+ }
+ res = append(res, &model.Rule{
+ Domain: t.Domain,
+ Project: t.Project,
+ ServiceID: t.ServiceID,
+ Rule: t.Rule,
+ })
+ }
+ if len(res)==0{
+ return nil,false
+ }
+ return res, true
+}
+
+func GetServiceByID(serviceID string) (*model.Service, bool) {
+ cacheRes := sd.Store().Service().Cache().GetValue(serviceID)
+ res, ok := transCacheToService(cacheRes)
+ if !ok {
+ return nil ,false
+ }
+ return res[0], true
+}
+
+func GetServiceID(ctx context.Context,key *discovery.MicroServiceKey)
(serviceID string, exist bool) {
+ cacheIndex := strings.Join([]string{util.ParseDomain(ctx),
util.ParseProject(ctx), key.AppId, key.ServiceName, key.Version}, "/")
+ res := sd.Store().Service().Cache().GetValue(cacheIndex)
+ cacheService, ok := transCacheToService(res)
+ if !ok {
+ return
+ }
+ return cacheService[0].Service.ServiceId, true
+}
+
+func transCacheToService(services []interface{}) ([]*model.Service, bool) {
+ res := make([]*model.Service, 0, len(services))
+ for _, v := range services {
+ t, ok := v.(model.Service)
+ if !ok {
+ return nil, false
+ }
+ res = append(res, &model.Service{
+ Domain: t.Domain,
+ Project: t.Project,
+ Tags: t.Tags,
+ Service: t.Service,
+ })
+ }
+ if len(res)==0{
+ return nil,false
+ }
+ return res, true
+}
\ No newline at end of file
diff --git a/datasource/mongo/client/dao/dep.go
b/datasource/mongo/client/dao/dep.go
new file mode 100644
index 0000000..16bc4c4
--- /dev/null
+++ b/datasource/mongo/client/dao/dep.go
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dao
+
+import (
+ "context"
+ "strings"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/client"
+
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+ mutil
"github.com/apache/servicecomb-service-center/datasource/mongo/util"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/go-chassis/cari/discovery"
+)
+
+const (
+ Provider = "p"
+ Consumer = "c"
+)
+
+func GetProviderDeps(ctx context.Context, provider *discovery.MicroService)
(*discovery.MicroServiceDependency, error) {
+ return getServiceofDeps(ctx, Provider, provider)
+}
+
+func getServiceofDeps(ctx context.Context, ruleType string, provider
*discovery.MicroService) (*discovery.MicroServiceDependency, error) {
+ filter := mutil.NewFilter(
+ mutil.ServiceType(ruleType),
+ mutil.ServiceKeyTenant(util.ParseDomainProject(ctx)),
+ mutil.ServiceKeyAppID(provider.AppId),
+ mutil.ServiceKeyServiceName(provider.ServiceName),
+ mutil.ServiceKeyServiceVersion(provider.Version),
+ )
+ depRule, err := getDeps(ctx, filter)
+ if err != nil {
+ return nil, err
+ }
+ return depRule.Dep, nil
+}
+
+func genDepserivceKey(ruleType string, service *discovery.MicroService) string
{
+ return strings.Join([]string{ruleType, service.AppId,
service.ServiceName, service.Version}, "/")
+}
+
+func getDeps(ctx context.Context, filter interface{}) (*model.DependencyRule,
error) {
+ findRes, err := client.GetMongoClient().FindOne(ctx,
model.CollectionDep, filter)
+ if err != nil {
+ return nil, err
+ }
+ var depRule *model.DependencyRule
+ if findRes.Err() != nil {
+ return nil, datasource.ErrNoData
+ }
+ err = findRes.Decode(&depRule)
+ if err != nil {
+ return nil, err
+ }
+ return depRule, nil
+}
diff --git a/datasource/mongo/client/dao/instance.go
b/datasource/mongo/client/dao/instance.go
index a59be3e..47a7efa 100644
--- a/datasource/mongo/client/dao/instance.go
+++ b/datasource/mongo/client/dao/instance.go
@@ -20,6 +20,9 @@ package dao
import (
"context"
+ mutil
"github.com/apache/servicecomb-service-center/datasource/mongo/util"
+ "go.mongodb.org/mongo-driver/bson"
+
"github.com/go-chassis/cari/discovery"
"go.mongodb.org/mongo-driver/mongo/options"
@@ -61,6 +64,12 @@ func GetInstances(ctx context.Context, filter interface{})
([]*model.Instance, e
return instances, nil
}
+func GetMicroServiceInstancesByID(ctx context.Context, serviceID string)
([]*discovery.MicroServiceInstance, error) {
+ filter := mutil.NewFilter(mutil.InstanceServiceID(serviceID))
+ option := &options.FindOptions{Sort:
bson.M{mutil.ConnectWithDot([]string{model.ColumnInstance,
model.ColumnVersion}): -1}}
+ return GetMicroServiceInstances(ctx, filter, option)
+}
+
func GetMicroServiceInstances(ctx context.Context, filter interface{}, opts
...*options.FindOptions) ([]*discovery.MicroServiceInstance, error) {
res, err := client.GetMongoClient().Find(ctx, model.CollectionInstance,
filter, opts...)
if err != nil {
diff --git a/datasource/mongo/client/dao/microservice.go
b/datasource/mongo/client/dao/microservice.go
index 51b1c58..7cd39ec 100644
--- a/datasource/mongo/client/dao/microservice.go
+++ b/datasource/mongo/client/dao/microservice.go
@@ -19,7 +19,11 @@ package dao
import (
"context"
+ "errors"
+
+ mutil
"github.com/apache/servicecomb-service-center/datasource/mongo/util"
"github.com/go-chassis/cari/discovery"
+ "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/apache/servicecomb-service-center/datasource"
@@ -27,6 +31,10 @@ import (
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
)
+func GetServiceByID(ctx context.Context, serviceID string) (*model.Service,
error) {
+ return GetService(ctx, mutil.NewBasicFilter(ctx,
mutil.ServiceServiceID(serviceID)))
+}
+
func GetService(ctx context.Context, filter interface{}, opts
...*options.FindOneOptions) (*model.Service, error) {
result, err := client.GetMongoClient().FindOne(ctx,
model.CollectionService, filter, opts...)
if err != nil {
@@ -34,7 +42,6 @@ func GetService(ctx context.Context, filter interface{}, opts
...*options.FindOn
}
var svc *model.Service
if result.Err() != nil {
- //not get any service,not db err
return nil, datasource.ErrNoData
}
err = result.Decode(&svc)
@@ -44,6 +51,43 @@ func GetService(ctx context.Context, filter interface{},
opts ...*options.FindOn
return svc, nil
}
+func GetServiceID(ctx context.Context, key *discovery.MicroServiceKey)
(string, error) {
+ filter := mutil.NewBasicFilter(
+ ctx,
+ mutil.ServiceEnv(key.Environment),
+ mutil.ServiceAppID(key.AppId),
+ mutil.ServiceServiceName(key.ServiceName),
+ mutil.ServiceVersion(key.Version),
+ )
+ id, err := getServiceID(ctx, filter)
+ if err != nil && !errors.Is(err, datasource.ErrNoData) {
+ return "", err
+ }
+ if len(id) == 0 && len(key.Alias) != 0 {
+ filter = mutil.NewBasicFilter(
+ ctx,
+ mutil.ServiceEnv(key.Environment),
+ mutil.ServiceAppID(key.AppId),
+ mutil.ServiceAlias(key.Alias),
+ mutil.ServiceVersion(key.Version),
+ )
+ return getServiceID(ctx, filter)
+ }
+ return id, nil
+}
+
+func getServiceID(ctx context.Context, filter bson.M) (serviceID string, err
error) {
+ svc, err := GetService(ctx, filter)
+ if err != nil {
+ return
+ }
+ if svc != nil {
+ serviceID = svc.Service.ServiceId
+ return
+ }
+ return
+}
+
func GetServices(ctx context.Context, filter interface{}, opts
...*options.FindOptions) ([]*model.Service, error) {
res, err := client.GetMongoClient().Find(ctx, model.CollectionService,
filter, opts...)
if err != nil {
diff --git a/datasource/mongo/client/dao/rule.go
b/datasource/mongo/client/dao/rule.go
index 522a119..4179ae6 100644
--- a/datasource/mongo/client/dao/rule.go
+++ b/datasource/mongo/client/dao/rule.go
@@ -20,6 +20,9 @@ package dao
import (
"context"
+ mutil
"github.com/apache/servicecomb-service-center/datasource/mongo/util"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+
"github.com/go-chassis/cari/discovery"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
@@ -29,6 +32,11 @@ import (
"github.com/apache/servicecomb-service-center/pkg/log"
)
+func GetRulesByServiceID(ctx context.Context, serviceID string)
([]*model.Rule, error) {
+ filter := mutil.NewDomainProjectFilter(util.ParseDomain(ctx),
util.ParseDomain(ctx), mutil.ServiceID(serviceID))
+ return GetRules(ctx, filter)
+}
+
func GetRules(ctx context.Context, filter interface{}) ([]*model.Rule, error) {
cursor, err := client.GetMongoClient().Find(ctx, model.CollectionRule,
filter)
if err != nil {
diff --git a/datasource/mongo/dep_util.go b/datasource/mongo/dep_util.go
index 6a39b4e..f2b7c3a 100644
--- a/datasource/mongo/dep_util.go
+++ b/datasource/mongo/dep_util.go
@@ -20,28 +20,25 @@ package mongo
import (
"context"
"fmt"
-
+ "github.com/apache/servicecomb-service-center/datasource/cache"
pb "github.com/go-chassis/cari/discovery"
"github.com/apache/servicecomb-service-center/datasource/mongo/client/dao"
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
- mutil
"github.com/apache/servicecomb-service-center/datasource/mongo/util"
- "github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/pkg/util"
)
-func GetAllConsumerIds(ctx context.Context, provider *pb.MicroService) (allow
[]string, deny []string, _ error) {
+func GetAllConsumerIds(ctx context.Context, provider *pb.MicroService) (allow
[]string, deny []string, err error) {
if provider == nil || len(provider.ServiceId) == 0 {
return nil, nil, fmt.Errorf("invalid provider")
}
//todo 删除服务,最后实例推送有误差
- domain := util.ParseDomainProject(ctx)
- project := util.ParseProject(ctx)
- filter := mutil.NewDomainProjectFilter(domain, project,
mutil.ServiceID(provider.ServiceId))
- providerRules, err := dao.GetRules(ctx, filter)
- if err != nil {
- return nil, nil, err
+ providerRules, ok := cache.GetRulesByServiceID(provider.ServiceId)
+ if !ok {
+ providerRules, err = dao.GetRulesByServiceID(ctx,
provider.ServiceId)
+ if err != nil {
+ return nil, nil, err
+ }
}
allow, deny, err = GetConsumerIDsWithFilter(ctx, provider,
providerRules)
@@ -52,12 +49,23 @@ func GetAllConsumerIds(ctx context.Context, provider
*pb.MicroService) (allow []
}
func GetConsumerIDsWithFilter(ctx context.Context, provider *pb.MicroService,
rules []*model.Rule) (allow []string, deny []string, err error) {
- domainProject := util.ParseDomainProject(ctx)
- dr := NewProviderDependencyRelation(ctx, domainProject, provider)
- consumerIDs, err := dr.GetDependencyConsumerIds()
- if err != nil {
- log.Error(fmt.Sprintf("get service[%s]'s consumerIds failed",
provider.ServiceId), err)
- return nil, nil, err
+ serviceDeps, ok := cache.GetProviderServiceOfDeps(provider)
+ if !ok {
+ serviceDeps, err = dao.GetProviderDeps(ctx, provider)
+ if err != nil {
+ return nil, nil, err
+ }
+ }
+ consumerIDs := make([]string, len(serviceDeps.Dependency))
+ for _, serviceKeys := range serviceDeps.Dependency {
+ id, ok := cache.GetServiceID(ctx, serviceKeys)
+ if !ok {
+ id, err = dao.GetServiceID(ctx, serviceKeys)
+ if err != nil {
+ return nil, nil, err
+ }
+ }
+ consumerIDs = append(consumerIDs, id)
}
return FilterAll(ctx, consumerIDs, rules)
}
diff --git a/datasource/mongo/dependency_query.go
b/datasource/mongo/dependency_query.go
index f307680..7c60355 100644
--- a/datasource/mongo/dependency_query.go
+++ b/datasource/mongo/dependency_query.go
@@ -635,7 +635,7 @@ func GenerateRuleKeyWithSameServiceNameAndAppID(serviceType
string, domainProjec
util.ServiceType(serviceType),
util.ServiceKeyTenant(domainProject),
util.ServiceKeyAppID(in.AppId),
- util.ServiceServiceName(in.ServiceName),
+ util.ServiceKeyServiceName(in.ServiceName),
)
}
@@ -660,6 +660,6 @@ func GenerateServiceDependencyRuleKey(serviceType string,
domainProject string,
util.ServiceKeyServiceEnv(in.Environment),
util.ServiceKeyAppID(in.AppId),
util.ServiceKeyServiceVersion(in.Version),
- util.ServiceServiceName(in.ServiceName),
+ util.ServiceKeyServiceName(in.ServiceName),
)
}
diff --git a/datasource/mongo/event/instance_event_handler.go
b/datasource/mongo/event/instance_event_handler.go
index aaad685..f367c15 100644
--- a/datasource/mongo/event/instance_event_handler.go
+++ b/datasource/mongo/event/instance_event_handler.go
@@ -19,13 +19,10 @@ package event
import (
"context"
- "errors"
"fmt"
+ "github.com/apache/servicecomb-service-center/datasource/cache"
"time"
- "github.com/go-chassis/cari/discovery"
- "go.mongodb.org/mongo-driver/bson"
-
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/mongo"
"github.com/apache/servicecomb-service-center/datasource/mongo/client/dao"
@@ -38,6 +35,7 @@ import (
"github.com/apache/servicecomb-service-center/server/metrics"
"github.com/apache/servicecomb-service-center/server/notify"
"github.com/apache/servicecomb-service-center/server/syncernotify"
+ "github.com/go-chassis/cari/discovery"
)
// InstanceEventHandler is the handler to handle events
@@ -51,15 +49,27 @@ func (h InstanceEventHandler) Type() string {
func (h InstanceEventHandler) OnEvent(evt sd.MongoEvent) {
action := evt.Type
+ if evt.Type == discovery.EVT_UPDATE {
+ return
+ }
instance := evt.Value.(model.Instance)
providerID := instance.Instance.ServiceId
providerInstanceID := instance.Instance.InstanceId
domainProject := instance.Domain + "/" + instance.Project
- cacheService := sd.Store().Service().Cache().Get(providerID)
- var microService *discovery.MicroService
- if cacheService != nil {
- microService = cacheService.(model.Service).Service
+ ctx := util.SetDomainProject(context.Background(), instance.Domain,
instance.Project)
+ res, ok := cache.GetServiceByID(providerID)
+ var err error
+ if !ok {
+ res, err = dao.GetServiceByID(ctx, providerID)
+ if err != nil {
+ log.Error(fmt.Sprintf("caught [%s] instance[%s/%s]
event, endpoints %v, get provider's file failed from db\n",
+ action, providerID, providerInstanceID,
instance.Instance.Endpoints), err)
+ }
}
+ if res == nil {
+ return
+ }
+ microService := res.Service
switch action {
case discovery.EVT_INIT:
metrics.ReportInstances(instance.Domain, increaseOne)
@@ -69,30 +79,10 @@ func (h InstanceEventHandler) OnEvent(evt sd.MongoEvent) {
metrics.ReportInstances(instance.Domain, increaseOne)
case discovery.EVT_DELETE:
metrics.ReportInstances(instance.Domain, decreaseOne)
- // to report quota
- }
- if microService == nil {
- log.Info("get cached service failed, then get from database")
- service, err := dao.GetService(context.Background(),
bson.M{"serviceinfo.serviceid": providerID})
- if err != nil {
- if errors.Is(err, datasource.ErrNoData) {
- log.Warn(fmt.Sprintf("there is no service with
id [%s] in the database", providerID))
- } else {
- log.Error("query database error", err)
- }
- return
- }
- microService = service.Service // service in the cache may not
ready, query from db once
- if microService == nil {
- log.Warn(fmt.Sprintf("caught [%s] instance[%s/%s]
event, endpoints %v, get provider's file failed from db\n",
- action, providerID, providerInstanceID,
instance.Instance.Endpoints))
- return
- }
}
if !syncernotify.GetSyncerNotifyCenter().Closed() {
NotifySyncerInstanceEvent(evt, microService)
}
- ctx := util.SetDomainProject(context.Background(), instance.Domain,
instance.Project)
consumerIDS, _, err := mongo.GetAllConsumerIds(ctx, microService)
if err != nil {
log.Error(fmt.Sprintf("get service[%s][%s/%s/%s/%s]'s
consumerIDs failed",
diff --git a/datasource/mongo/event/instance_event_handler_test.go
b/datasource/mongo/event/instance_event_handler_test.go
index a701795..c802c5f 100644
--- a/datasource/mongo/event/instance_event_handler_test.go
+++ b/datasource/mongo/event/instance_event_handler_test.go
@@ -92,7 +92,6 @@ func mongoAssign() sd.MongoEvent {
mongoEvent.DocumentID = "5fdc483b4a885f69317e3505"
mongoEvent.Value = mongoInstance
mongoEvent.Type = discovery.EVT_CREATE
- mongoEvent.ResourceID = "f73dceb440f711eba63ffa163e7cdcb8"
return mongoEvent
}
@@ -112,7 +111,6 @@ func mongoEventWronServiceId() sd.MongoEvent {
mongoEvent.DocumentID = "5fdc483b4a885f69317e3505"
mongoEvent.Value = mongoInstance
mongoEvent.Type = discovery.EVT_CREATE
- mongoEvent.ResourceID = "f73dceb440f711eba63ffa163e7cdcb8"
return mongoEvent
}
diff --git a/datasource/mongo/ms.go b/datasource/mongo/ms.go
index 3d224b8..4d6c508 100644
--- a/datasource/mongo/ms.go
+++ b/datasource/mongo/ms.go
@@ -23,6 +23,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "github.com/apache/servicecomb-service-center/datasource/cache"
"reflect"
"regexp"
"sort"
@@ -41,7 +42,6 @@ import (
"github.com/apache/servicecomb-service-center/datasource/mongo/client/dao"
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
"github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat"
- "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
mutil
"github.com/apache/servicecomb-service-center/datasource/mongo/util"
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
@@ -197,18 +197,22 @@ func (ds *DataSource) GetApplications(ctx
context.Context, request *discovery.Ge
}
func (ds *DataSource) GetService(ctx context.Context, request
*discovery.GetServiceRequest) (*discovery.GetServiceResponse, error) {
- svc, err := GetServiceByID(ctx, request.ServiceId)
- if err != nil {
- if errors.Is(err, datasource.ErrNoData) {
- log.Debug(fmt.Sprintf("service %s not exist in db",
request.ServiceId))
+ svc, ok := cache.GetServiceByID(request.ServiceId)
+ if !ok {
+ var err error
+ svc, err = dao.GetServiceByID(ctx, request.ServiceId)
+ if err != nil {
+ if errors.Is(err, datasource.ErrNoData) {
+ log.Debug(fmt.Sprintf("service %s not exist in
db", request.ServiceId))
+ return &discovery.GetServiceResponse{
+ Response:
discovery.CreateResponse(discovery.ErrServiceNotExists, "Service not exist."),
+ }, nil
+ }
+ log.Error(fmt.Sprintf("failed to get single service %s
from mongo", request.ServiceId), err)
return &discovery.GetServiceResponse{
- Response:
discovery.CreateResponse(discovery.ErrServiceNotExists, "Service not exist."),
- }, nil
+ Response:
discovery.CreateResponse(discovery.ErrInternal, "get service data from mongodb
failed."),
+ }, err
}
- log.Error(fmt.Sprintf("failed to get single service %s from
mongo", request.ServiceId), err)
- return &discovery.GetServiceResponse{
- Response:
discovery.CreateResponse(discovery.ErrInternal, "get service data from mongodb
failed."),
- }, err
}
return &discovery.GetServiceResponse{
Response: discovery.CreateResponse(discovery.ResponseSuccess,
"Get service successfully."),
@@ -1521,13 +1525,17 @@ func (ds *DataSource) GetInstance(ctx context.Context,
request *discovery.GetOne
Response:
discovery.CreateResponse(discovery.ErrInstanceNotExists, mes.Error()),
}, nil
}
- instances, err := GetInstancesByServiceID(ctx,
request.ProviderServiceId)
- if err != nil {
- log.Error(fmt.Sprintf("get instance failed %s", findFlag()),
err)
- return &discovery.GetOneInstanceResponse{
- Response:
discovery.CreateResponse(discovery.ErrInternal, err.Error()),
- }, err
+ instances, ok :=
cache.GetMicroServiceInstancesByID(request.ProviderServiceId)
+ if !ok {
+ instances, err = dao.GetMicroServiceInstancesByID(ctx,
request.ProviderServiceId)
+ if err != nil {
+ log.Error(fmt.Sprintf("get instance failed %s",
findFlag()), err)
+ return &discovery.GetOneInstanceResponse{
+ Response:
discovery.CreateResponse(discovery.ErrInternal, err.Error()),
+ }, err
+ }
}
+
instance := instances[0]
// use explicit instanceId to query
if len(request.ProviderInstanceId) != 0 {
@@ -1566,17 +1574,41 @@ func (ds *DataSource) GetInstances(ctx context.Context,
request *discovery.GetIn
var err error
if len(request.ConsumerServiceId) > 0 {
- service, err = GetServiceByID(ctx, request.ConsumerServiceId)
+ var exist bool
+ service, exist = cache.GetServiceByID(request.ConsumerServiceId)
+ if !exist {
+ service, err = dao.GetServiceByID(ctx,
request.ConsumerServiceId)
+ if err != nil {
+ if errors.Is(err, datasource.ErrNoData) {
+ log.Debug(fmt.Sprintf("consumer does
not exist, consumer %s find provider %s instances",
+ request.ConsumerServiceId,
request.ProviderServiceId))
+ return &discovery.GetInstancesResponse{
+ Response:
discovery.CreateResponse(discovery.ErrServiceNotExists,
+
fmt.Sprintf("Consumer[%s] does not exist.", request.ConsumerServiceId)),
+ }, nil
+ }
+ log.Error(fmt.Sprintf("get consumer failed,
consumer %s find provider %s instances",
+ request.ConsumerServiceId,
request.ProviderServiceId), err)
+ return &discovery.GetInstancesResponse{
+ Response:
discovery.CreateResponse(discovery.ErrInternal, err.Error()),
+ }, err
+ }
+ }
+ }
+
+ provider, ok := cache.GetServiceByID(request.ProviderServiceId)
+ if !ok {
+ provider, err = dao.GetServiceByID(ctx,
request.ProviderServiceId)
if err != nil {
if errors.Is(err, datasource.ErrNoData) {
- log.Debug(fmt.Sprintf("consumer does not exist,
consumer %s find provider %s instances",
+ log.Debug(fmt.Sprintf("provider does not exist,
consumer %s find provider %s instances",
request.ConsumerServiceId,
request.ProviderServiceId))
return &discovery.GetInstancesResponse{
Response:
discovery.CreateResponse(discovery.ErrServiceNotExists,
- fmt.Sprintf("Consumer[%s] does
not exist.", request.ConsumerServiceId)),
+ fmt.Sprintf("provider[%s] does
not exist.", request.ProviderServiceId)),
}, nil
}
- log.Error(fmt.Sprintf("get consumer failed, consumer %s
find provider %s instances",
+ log.Error(fmt.Sprintf("get provider failed, consumer %s
find provider instances %s",
request.ConsumerServiceId,
request.ProviderServiceId), err)
return &discovery.GetInstancesResponse{
Response:
discovery.CreateResponse(discovery.ErrInternal, err.Error()),
@@ -1584,23 +1616,6 @@ func (ds *DataSource) GetInstances(ctx context.Context,
request *discovery.GetIn
}
}
- provider, err := GetServiceByID(ctx, request.ProviderServiceId)
- if err != nil {
- if errors.Is(err, datasource.ErrNoData) {
- log.Debug(fmt.Sprintf("provider does not exist,
consumer %s find provider %s instances",
- request.ConsumerServiceId,
request.ProviderServiceId))
- return &discovery.GetInstancesResponse{
- Response:
discovery.CreateResponse(discovery.ErrServiceNotExists,
- fmt.Sprintf("provider[%s] does not
exist.", request.ProviderServiceId)),
- }, nil
- }
- log.Error(fmt.Sprintf("get provider failed, consumer %s find
provider instances %s",
- request.ConsumerServiceId, request.ProviderServiceId),
err)
- return &discovery.GetInstancesResponse{
- Response:
discovery.CreateResponse(discovery.ErrInternal, err.Error()),
- }, err
- }
-
findFlag := func() string {
return fmt.Sprintf("consumer[%s][%s/%s/%s/%s] find
provider[%s][%s/%s/%s/%s] instances",
request.ConsumerServiceId, service.Service.Environment,
service.Service.AppId, service.Service.ServiceName, service.Service.Version,
@@ -1616,12 +1631,15 @@ func (ds *DataSource) GetInstances(ctx context.Context,
request *discovery.GetIn
Response:
discovery.CreateResponse(discovery.ErrServiceNotExists, mes.Error()),
}, nil
}
- instances, err := GetInstancesByServiceID(ctx,
request.ProviderServiceId)
- if err != nil {
- log.Error(fmt.Sprintf("get instances failed %s", findFlag()),
err)
- return &discovery.GetInstancesResponse{
- Response:
discovery.CreateResponse(discovery.ErrInternal, err.Error()),
- }, err
+ instances, ok :=
cache.GetMicroServiceInstancesByID(request.ProviderServiceId)
+ if !ok {
+ instances, err = dao.GetMicroServiceInstancesByID(ctx,
request.ProviderServiceId)
+ if err != nil {
+ log.Error(fmt.Sprintf("get instances failed %s",
findFlag()), err)
+ return &discovery.GetInstancesResponse{
+ Response:
discovery.CreateResponse(discovery.ErrInternal, err.Error()),
+ }, err
+ }
}
newRev, _ := formatRevision(request.ConsumerServiceId, instances)
if rev == newRev {
@@ -2603,49 +2621,6 @@ func allowAcrossDimension(ctx context.Context,
providerService *model.Service, c
return nil
}
-func GetServiceByID(ctx context.Context, serviceID string) (*model.Service,
error) {
- cacheService, ok :=
sd.Store().Service().Cache().Get(serviceID).(model.Service)
- if !ok {
- //no service in cache,get it from mongodb
- filter := mutil.NewBasicFilter(ctx,
mutil.ServiceServiceID(serviceID))
- return dao.GetService(ctx, filter)
- }
- return cacheToService(cacheService), nil
-}
-
-func cacheToService(service model.Service) *model.Service {
- return &model.Service{
- Domain: service.Domain,
- Project: service.Project,
- Tags: service.Tags,
- Service: service.Service,
- }
-}
-
-func GetInstancesByServiceID(ctx context.Context, serviceID string)
([]*discovery.MicroServiceInstance, error) {
- var res []*discovery.MicroServiceInstance
- var cacheUnavailable bool
- cacheInstances := sd.Store().Instance().Cache().GetIndexData(serviceID)
- for _, instID := range cacheInstances {
- inst, ok :=
sd.Store().Instance().Cache().Get(instID).(model.Instance)
- if !ok {
- cacheUnavailable = true
- break
- }
- res = append(res, inst.Instance)
- }
- if cacheUnavailable || len(res) == 0 {
- filter := mutil.NewFilter(mutil.InstanceServiceID(serviceID))
- option := &options.FindOptions{Sort:
bson.M{mutil.ConnectWithDot([]string{model.ColumnInstance,
model.ColumnVersion}): -1}}
- res, err := dao.GetMicroServiceInstances(ctx, filter, option)
- if err != nil {
- return nil, err
- }
- return res, nil
- }
- return res, nil
-}
-
func DeleteDependencyForDeleteService(domainProject string, serviceID string,
service *discovery.MicroServiceKey) error {
conDep := new(discovery.ConsumerDependency)
conDep.Consumer = service
diff --git a/datasource/mongo/rule_util.go b/datasource/mongo/rule_util.go
index e4bd435..a6fbf2c 100644
--- a/datasource/mongo/rule_util.go
+++ b/datasource/mongo/rule_util.go
@@ -19,7 +19,7 @@ package mongo
import (
"context"
-
+ "github.com/apache/servicecomb-service-center/datasource/cache"
"github.com/go-chassis/cari/discovery"
"github.com/apache/servicecomb-service-center/datasource/mongo/client/dao"
@@ -29,9 +29,13 @@ import (
)
func Filter(ctx context.Context, rules []*model.Rule, consumerID string)
(bool, error) {
- consumer, err := GetServiceByID(ctx, consumerID)
- if consumer == nil {
- return false, err
+ consumer, ok := cache.GetServiceByID(consumerID)
+ if !ok {
+ var err error
+ consumer, err = dao.GetServiceByID(ctx, consumerID)
+ if err != nil {
+ return false, err
+ }
}
if len(rules) == 0 {
diff --git a/datasource/mongo/sd/dep_cache.go b/datasource/mongo/sd/dep_cache.go
new file mode 100644
index 0000000..9619c5f
--- /dev/null
+++ b/datasource/mongo/sd/dep_cache.go
@@ -0,0 +1,132 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sd
+
+import (
+ "strings"
+
+
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+ "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+ "go.mongodb.org/mongo-driver/bson"
+)
+
+type depStore struct {
+ dirty bool
+ d *DocStore
+ indexCache *indexCache
+}
+
+func init() {
+ RegisterCacher(dep, newDepStore)
+}
+
+func newDepStore() *MongoCacher {
+ options := DefaultOptions().SetTable(dep)
+ cache := &depStore{
+ dirty: false,
+ d: NewDocStore(),
+ indexCache: NewIndexCache(),
+ }
+ depUnmarshal := func(doc bson.Raw) (resource sdcommon.Resource) {
+ docID := MongoDocument{}
+ err := bson.Unmarshal(doc, &docID)
+ if err != nil {
+ return
+ }
+ dep := model.DependencyRule{}
+ err = bson.Unmarshal(doc, &dep)
+ if err != nil {
+ return
+ }
+ resource.Value = dep
+ resource.Key = docID.ID.Hex()
+ return
+ }
+ return NewMongoCacher(options, cache, depUnmarshal)
+}
+
+func (s *depStore) Name() string {
+ return dep
+}
+
+func (s *depStore) Size() int {
+ return s.d.Size()
+}
+
+func (s *depStore) Get(key string) interface{} {
+ return s.d.Get(key)
+}
+
+func (s *depStore) ForEach(iter func(k string, v interface{}) (next bool)) {
+ s.d.ForEach(iter)
+}
+
+func (s *depStore) GetValue(index string) []interface{} {
+ docs := s.indexCache.Get(index)
+ res := make([]interface{}, 0, len(docs))
+ for _, v := range docs {
+ res = append(res, s.d.Get(v))
+ }
+ return res
+}
+
+func (s *depStore) Dirty() bool {
+ return s.dirty
+}
+
+func (s *depStore) MarkDirty() {
+ s.dirty = true
+}
+
+func (s *depStore) Clear() {
+ s.dirty = false
+ s.d.store.Flush()
+}
+
+func (s *depStore) ProcessUpdate(event MongoEvent) {
+ dep, ok := event.Value.(model.DependencyRule)
+ if !ok {
+ return
+ }
+ if dep.ServiceKey == nil {
+ return
+ }
+ s.d.Put(event.DocumentID, event.Value)
+ s.indexCache.Put(genDepServiceKey(dep), event.DocumentID)
+ return
+}
+
+func (s *depStore) ProcessDelete(event MongoEvent) {
+ dep, ok := s.d.Get(event.DocumentID).(model.DependencyRule)
+ if !ok {
+ return
+ }
+ if dep.ServiceKey == nil {
+ return
+ }
+ s.d.DeleteDoc(event.DocumentID)
+ s.indexCache.Delete(genDepServiceKey(dep), event.DocumentID)
+}
+
+func (s *depStore) isValueNotUpdated(value interface{}, newValue interface{})
bool {
+ return false
+}
+
+func genDepServiceKey(dep model.DependencyRule) string {
+ return strings.Join([]string{dep.Type, dep.ServiceKey.AppId,
dep.ServiceKey.ServiceName, dep.ServiceKey.Version}, "/")
+}
diff --git a/datasource/mongo/sd/depcache_test.go
b/datasource/mongo/sd/depcache_test.go
new file mode 100644
index 0000000..fa7f47e
--- /dev/null
+++ b/datasource/mongo/sd/depcache_test.go
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package sd
+
+import (
+ "testing"
+
+
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+ "github.com/go-chassis/cari/discovery"
+ "github.com/stretchr/testify/assert"
+)
+
+var depCache *MongoCacher
+
+var depRule1 = model.DependencyRule{
+ Type: "p",
+ Domain: "default",
+ Project: "default",
+ ServiceKey: &discovery.MicroServiceKey{
+ Tenant: "default/default",
+ Environment: "prod",
+ AppId: "appid1",
+ ServiceName: "svc1",
+ Alias: "alias1",
+ Version: "1.0",
+ },
+}
+
+var depRule2 = model.DependencyRule{
+ Type: "p",
+ Domain: "default",
+ Project: "default",
+ ServiceKey: &discovery.MicroServiceKey{
+ Tenant: "default/default",
+ Environment: "prod",
+ AppId: "appid1",
+ ServiceName: "svc1",
+ Alias: "alias1",
+ Version: "1.0",
+ },
+}
+
+func init() {
+ depCache = newDepStore()
+}
+
+func TestDepCacheBasicFunc(t *testing.T) {
+ t.Run("init depcache, should pass", func(t *testing.T) {
+ depCache := newDepStore()
+ assert.NotNil(t, depCache)
+ assert.Equal(t, dep, depCache.cache.Name())
+ })
+ event1 := MongoEvent{
+ DocumentID: "id1",
+ Value: depRule1,
+ }
+ event2 := MongoEvent{
+ DocumentID: "id2",
+ Value: depRule2,
+ }
+ t.Run("update&&delete depcache, should pass", func(t *testing.T) {
+ depCache.cache.ProcessUpdate(event1)
+ assert.Equal(t, depCache.cache.Size(), 1)
+ assert.Nil(t, depCache.cache.Get("id_not_exist"))
+ assert.Equal(t, depRule1.ServiceKey.ServiceName,
depCache.cache.Get("id1").(model.DependencyRule).ServiceKey.ServiceName)
+ assert.Len(t, depCache.cache.GetValue("p/appid1/svc1/1.0"), 1)
+ depCache.cache.ProcessUpdate(event2)
+ assert.Len(t, depCache.cache.GetValue("p/appid1/svc1/1.0"), 2)
+ depCache.cache.ProcessDelete(event1)
+ assert.Nil(t, depCache.cache.Get("id1"))
+ assert.Len(t, depCache.cache.GetValue("p/appid1/svc1/1.0"), 1)
+ depCache.cache.ProcessDelete(event2)
+ assert.Len(t, depCache.cache.GetValue("p/appid1/svc1/1.0"), 0)
+ assert.Nil(t, depCache.cache.Get("id2"))
+ })
+}
diff --git a/datasource/mongo/sd/cache.go b/datasource/mongo/sd/doc_cache.go
similarity index 53%
copy from datasource/mongo/sd/cache.go
copy to datasource/mongo/sd/doc_cache.go
index 9819041..d594238 100644
--- a/datasource/mongo/sd/cache.go
+++ b/datasource/mongo/sd/doc_cache.go
@@ -17,30 +17,44 @@
package sd
-// Cache stores db data.
-type Cache interface {
- CacheReader
+import "github.com/patrickmn/go-cache"
- Put(id string, v interface{})
-
- Remove(id string)
-
- MarkDirty()
+type DocStore struct {
+ store *cache.Cache
+}
- Dirty() bool
+func NewDocStore() *DocStore {
+ return &DocStore{store: cache.New(cache.NoExpiration, 0)}
+}
- Clear()
+func (c *DocStore) Get(key string) (v interface{}) {
+ v, ok := c.store.Get(key)
+ if !ok {
+ return nil
+ }
+ return
}
-// CacheReader reads k-v data.
-type CacheReader interface {
- Name() string // The name of implementation
+func (c *DocStore) Put(key string, v interface{}) {
+ c.store.Set(key, v, cache.NoExpiration)
+}
- Size() int // the bytes size of the cache
+func (c *DocStore) DeleteDoc(key string) {
+ c.store.Delete(key)
+}
- // Get gets a value by id
- Get(id string) interface{}
+func (c *DocStore) ForEach(iter func(k string, v interface{}) (next bool)) {
+ items := c.store.Items()
+ for k, v := range items {
+ if v.Object == nil {
+ continue
+ }
+ if !iter(k, v) {
+ break
+ }
+ }
+}
- // ForEach executes the given function for each of the k-v
- ForEach(iter func(k string, v interface{}) (next bool))
+func (c *DocStore) Size() int {
+ return c.store.ItemCount()
}
diff --git a/datasource/mongo/sd/options_test.go
b/datasource/mongo/sd/docc_test.go
similarity index 51%
copy from datasource/mongo/sd/options_test.go
copy to datasource/mongo/sd/docc_test.go
index 58fc401..47ce9a6 100644
--- a/datasource/mongo/sd/options_test.go
+++ b/datasource/mongo/sd/docc_test.go
@@ -17,45 +17,34 @@
* under the License.
*/
-package sd
+package sd_test
import (
"testing"
- "github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/go-chassis/cari/discovery"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
"github.com/stretchr/testify/assert"
)
-func TestOptions(t *testing.T) {
- options := Options{
- Key: "",
- }
- assert.Empty(t, options, "config is empty")
+var docCache *sd.DocStore
- options1 := options.SetTable("configKey")
- assert.Equal(t, "configKey", options1.Key,
- "contain key after method WithTable")
-
- assert.Equal(t, 0, options1.InitSize,
- "init size is zero")
-
- mongoEventFunc = mongoEventFuncGet()
-
- out := options1.String()
- assert.NotNil(t, out,
- "method String return not after methods")
+func init() {
+ docCache = sd.NewDocStore()
}
-var mongoEventFunc MongoEventFunc
-
-func mongoEventFuncGet() MongoEventFunc {
- fun := func(evt MongoEvent) {
- evt.DocumentID = "DocumentID has changed"
- evt.ResourceID = "BusinessID has changed"
- evt.Value = 2
- evt.Type = discovery.EVT_UPDATE
- log.Info("in event func")
- }
- return fun
+func TestDocCache(t *testing.T) {
+ t.Run("init docCache,should pass", func(t *testing.T) {
+ docCache = sd.NewDocStore()
+ assert.NotNil(t, docCache)
+ assert.Nil(t, docCache.Get("id1"))
+ })
+ t.Run("update&&delete docCache, should pass", func(t *testing.T) {
+ docCache.Put("id1", "doc1")
+ assert.Equal(t, "doc1", docCache.Get("id1").(string))
+ assert.Equal(t, 1, docCache.Size())
+ docCache.Put("id2", "doc2")
+ assert.Equal(t, 2, docCache.Size())
+ docCache.DeleteDoc("id2")
+ assert.Equal(t, 1, docCache.Size())
+ })
}
diff --git a/datasource/mongo/sd/event_proxy_test.go
b/datasource/mongo/sd/event_proxy_test.go
index 7fb47e5..4b9b577 100644
--- a/datasource/mongo/sd/event_proxy_test.go
+++ b/datasource/mongo/sd/event_proxy_test.go
@@ -34,7 +34,6 @@ func TestAddHandleFuncAndOnEvent(t *testing.T) {
}
mongoEvent := MongoEvent{
DocumentID: "",
- ResourceID: "",
Type: discovery.EVT_CREATE,
Value: 1,
}
diff --git a/datasource/mongo/sd/cache.go b/datasource/mongo/sd/hset.go
similarity index 53%
rename from datasource/mongo/sd/cache.go
rename to datasource/mongo/sd/hset.go
index 9819041..0ee909c 100644
--- a/datasource/mongo/sd/cache.go
+++ b/datasource/mongo/sd/hset.go
@@ -17,30 +17,54 @@
package sd
-// Cache stores db data.
-type Cache interface {
- CacheReader
+import (
+ "sync"
+)
- Put(id string, v interface{})
-
- Remove(id string)
-
- MarkDirty()
-
- Dirty() bool
+type Hset struct {
+ m map[string]struct{}
+ l sync.RWMutex
+}
- Clear()
+func Newhset(key string) *Hset {
+ mt := make(map[string]struct{})
+ mt[key] = struct{}{}
+ return &Hset{
+ m: mt,
+ l: sync.RWMutex{},
+ }
}
-// CacheReader reads k-v data.
-type CacheReader interface {
- Name() string // The name of implementation
+func (h *Hset) Insert(key string) {
+ h.l.Lock()
+ defer h.l.Unlock()
+ _, exist := h.m[key]
+ if exist {
+ return
+ }
+ h.m[key] = struct{}{}
+}
- Size() int // the bytes size of the cache
+func (h *Hset) Del(key string) {
+ h.l.Lock()
+ defer h.l.Unlock()
+ _, exist := h.m[key]
+ if !exist {
+ return
+ }
+ delete(h.m, key)
+}
- // Get gets a value by id
- Get(id string) interface{}
+func (h *Hset) Len() int {
+ return len(h.m)
+}
- // ForEach executes the given function for each of the k-v
- ForEach(iter func(k string, v interface{}) (next bool))
+func (h *Hset) Iter() []string {
+ h.l.RLock()
+ defer h.l.RUnlock()
+ res := make([]string, 0, h.Len())
+ for k, _ := range h.m {
+ res = append(res, k)
+ }
+ return res
}
diff --git a/datasource/mongo/sd/options_test.go
b/datasource/mongo/sd/hset_test.go
similarity index 51%
copy from datasource/mongo/sd/options_test.go
copy to datasource/mongo/sd/hset_test.go
index 58fc401..b0ec440 100644
--- a/datasource/mongo/sd/options_test.go
+++ b/datasource/mongo/sd/hset_test.go
@@ -17,45 +17,24 @@
* under the License.
*/
-package sd
+package sd_test
import (
"testing"
- "github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/go-chassis/cari/discovery"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
"github.com/stretchr/testify/assert"
)
-func TestOptions(t *testing.T) {
- options := Options{
- Key: "",
- }
- assert.Empty(t, options, "config is empty")
-
- options1 := options.SetTable("configKey")
- assert.Equal(t, "configKey", options1.Key,
- "contain key after method WithTable")
-
- assert.Equal(t, 0, options1.InitSize,
- "init size is zero")
-
- mongoEventFunc = mongoEventFuncGet()
-
- out := options1.String()
- assert.NotNil(t, out,
- "method String return not after methods")
-}
-
-var mongoEventFunc MongoEventFunc
-
-func mongoEventFuncGet() MongoEventFunc {
- fun := func(evt MongoEvent) {
- evt.DocumentID = "DocumentID has changed"
- evt.ResourceID = "BusinessID has changed"
- evt.Value = 2
- evt.Type = discovery.EVT_UPDATE
- log.Info("in event func")
- }
- return fun
+func TestHset(t *testing.T) {
+ hset := sd.Newhset("key1")
+ assert.NotNil(t, hset)
+ assert.Equal(t, 1, hset.Len())
+ hset.Insert("key1")
+ assert.Equal(t, 1, hset.Len())
+ hset.Insert("key2")
+ assert.Equal(t, 2, hset.Len())
+ hset.Del("key1")
+ assert.Equal(t, 1, hset.Len())
+ assert.Equal(t, []string{"key2"}, hset.Iter())
}
diff --git a/datasource/mongo/sd/index_cache.go
b/datasource/mongo/sd/index_cache.go
new file mode 100644
index 0000000..1977bb8
--- /dev/null
+++ b/datasource/mongo/sd/index_cache.go
@@ -0,0 +1,71 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sd
+
+import (
+ "github.com/patrickmn/go-cache"
+)
+
+type indexCache struct {
+ store *cache.Cache
+}
+
+func NewIndexCache() *indexCache {
+ return &indexCache{
+ store: cache.New(cache.NoExpiration, 0),
+ }
+}
+
+func (i *indexCache) Get(key string) []string {
+ if v, found := i.store.Get(key); found {
+ hset, ok := v.(*Hset)
+ if ok {
+ return hset.Iter()
+ }
+ }
+ return nil
+}
+
+func (i *indexCache) Put(key string, value string) {
+ //todo this should be atomic
+ v, found := i.store.Get(key)
+ if !found {
+ i.store.Set(key, Newhset(value), cache.NoExpiration)
+ return
+ }
+ set, ok := v.(*Hset)
+ if !ok {
+ return
+ }
+ set.Insert(value)
+}
+
+func (i *indexCache) Delete(key string, value string) {
+ v, found := i.store.Get(key)
+ if !found {
+ return
+ }
+ set, ok := v.(*Hset)
+ if !ok {
+ return
+ }
+ set.Del(value)
+ if set.Len() == 0 {
+ i.store.Delete(key)
+ }
+}
diff --git a/datasource/mongo/sd/indexc_test.go
b/datasource/mongo/sd/indexc_test.go
new file mode 100644
index 0000000..169a7d7
--- /dev/null
+++ b/datasource/mongo/sd/indexc_test.go
@@ -0,0 +1,21 @@
+package sd_test
+
+import (
+ "testing"
+
+ "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestIndexCache(t *testing.T) {
+ indexCache := sd.NewIndexCache()
+ assert.NotNil(t, indexCache)
+ indexCache.Put("index1", "doc1")
+ assert.Equal(t, []string{"doc1"}, indexCache.Get("index1"))
+ indexCache.Put("index1", "doc2")
+ assert.Len(t, indexCache.Get("index1"), 2)
+ indexCache.Delete("index1", "doc1")
+ assert.Len(t, indexCache.Get("index1"), 1)
+ indexCache.Delete("index1", "doc2")
+ assert.Nil(t, indexCache.Get("index1"))
+}
diff --git a/datasource/mongo/sd/instance_cache.go
b/datasource/mongo/sd/instance_cache.go
new file mode 100644
index 0000000..a053cec
--- /dev/null
+++ b/datasource/mongo/sd/instance_cache.go
@@ -0,0 +1,129 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sd
+
+import (
+
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+ "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+ rmodel "github.com/go-chassis/cari/discovery"
+ "go.mongodb.org/mongo-driver/bson"
+)
+
+type instanceStore struct {
+ dirty bool
+ d *DocStore
+ indexCache *indexCache
+}
+
+func init() {
+ RegisterCacher(instance, newInstanceStore)
+}
+
+func newInstanceStore() *MongoCacher {
+ options := DefaultOptions().SetTable(instance)
+ cache := &instanceStore{
+ dirty: false,
+ d: NewDocStore(),
+ indexCache: NewIndexCache(),
+ }
+ instanceUnmarshal := func(doc bson.Raw) (resource sdcommon.Resource) {
+ docID := MongoDocument{}
+ err := bson.Unmarshal(doc, &docID)
+ if err != nil {
+ return
+ }
+ inst := model.Instance{}
+ err = bson.Unmarshal(doc, &inst)
+ if err != nil {
+ return
+ }
+ resource.Value = inst
+ resource.Key = docID.ID.Hex()
+ return
+ }
+ return NewMongoCacher(options, cache, instanceUnmarshal)
+}
+
+func (s *instanceStore) Name() string {
+ return instance
+}
+
+func (s *instanceStore) Size() int {
+ return s.d.Size()
+}
+
+func (s *instanceStore) Get(key string) interface{} {
+ return s.d.Get(key)
+}
+
+func (s *instanceStore) ForEach(iter func(k string, v interface{}) (next
bool)) {
+ s.d.ForEach(iter)
+}
+
+func (s *instanceStore) GetValue(index string) []interface{} {
+ docs := s.indexCache.Get(index)
+ res := make([]interface{}, 0, len(docs))
+ for _, v := range docs {
+ res = append(res, s.d.Get(v))
+ }
+ return res
+}
+
+func (s *instanceStore) Dirty() bool {
+ return s.dirty
+}
+
+func (s *instanceStore) MarkDirty() {
+ s.dirty = true
+}
+
+func (s *instanceStore) Clear() {
+ s.dirty = false
+ s.d.store.Flush()
+}
+
+func (s *instanceStore) ProcessUpdate(event MongoEvent) {
+ //instance only process create and del event.
+ if event.Type == rmodel.EVT_UPDATE {
+ return
+ }
+ inst, ok := event.Value.(model.Instance)
+ if !ok {
+ return
+ }
+ s.d.Put(event.DocumentID, event.Value)
+ s.indexCache.Put(genInstServiceID(inst), event.DocumentID)
+ return
+}
+
+func (s *instanceStore) ProcessDelete(event MongoEvent) {
+ inst, ok := s.d.Get(event.DocumentID).(model.Instance)
+ if !ok {
+ return
+ }
+ s.d.DeleteDoc(event.DocumentID)
+ s.indexCache.Delete(genInstServiceID(inst), event.DocumentID)
+}
+
+func (s *instanceStore) isValueNotUpdated(value interface{}, newValue
interface{}) bool {
+ return true
+}
+
+func genInstServiceID(inst model.Instance) string {
+ return inst.Instance.ServiceId
+}
diff --git a/datasource/mongo/sd/instancec_test.go
b/datasource/mongo/sd/instancec_test.go
new file mode 100644
index 0000000..9b2501d
--- /dev/null
+++ b/datasource/mongo/sd/instancec_test.go
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package sd
+
+import (
+ "testing"
+
+
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+ "github.com/go-chassis/cari/discovery"
+ "github.com/stretchr/testify/assert"
+)
+
+var instanceCache *MongoCacher
+
+var inst1 = model.Instance{
+ Domain: "default",
+ Project: "default",
+ Instance: &discovery.MicroServiceInstance{
+ InstanceId: "123456789",
+ ServiceId: "svcid",
+ },
+}
+
+var inst2 = model.Instance{
+ Domain: "default",
+ Project: "default",
+ Instance: &discovery.MicroServiceInstance{
+ InstanceId: "987654321",
+ ServiceId: "svcid",
+ },
+}
+
+func init() {
+ instanceCache = newInstanceStore()
+}
+
+func TestInstCacheBasicFunc(t *testing.T) {
+ t.Run("init instCache,should pass", func(t *testing.T) {
+ instanceCache := newInstanceStore()
+ assert.NotNil(t, instanceCache)
+ assert.Equal(t, instance, instanceCache.cache.Name())
+ })
+ event1 := MongoEvent{
+ DocumentID: "id1",
+ Value: inst1,
+ }
+ event2 := MongoEvent{
+ DocumentID: "id2",
+ Value: inst2,
+ }
+ t.Run("update&&delete instCache, should pass", func(t *testing.T) {
+ instanceCache.cache.ProcessUpdate(event1)
+ assert.Equal(t, instanceCache.cache.Size(), 1)
+ assert.Nil(t, instanceCache.cache.Get("id_not_exist"))
+ assert.Equal(t, inst1.Instance.InstanceId,
instanceCache.cache.Get("id1").(model.Instance).Instance.InstanceId)
+ assert.Len(t, instanceCache.cache.GetValue("svcid"), 1)
+ instanceCache.cache.ProcessUpdate(event2)
+ assert.Equal(t, instanceCache.cache.Size(), 2)
+ assert.Len(t, instanceCache.cache.GetValue("svcid"), 2)
+ instanceCache.cache.ProcessDelete(event1)
+ assert.Nil(t, instanceCache.cache.Get("id1"))
+ assert.Len(t, instanceCache.cache.GetValue("svcid"), 1)
+ instanceCache.cache.ProcessDelete(event2)
+ assert.Len(t, instanceCache.cache.GetValue("svcid"), 0)
+ assert.Nil(t, instanceCache.cache.Get("id2"))
+ assert.Len(t, instanceCache.cache.GetValue("svcid"), 0)
+ })
+}
diff --git a/datasource/mongo/sd/listwatch_inner.go
b/datasource/mongo/sd/listwatch_inner.go
index da4612e..3d567aa 100644
--- a/datasource/mongo/sd/listwatch_inner.go
+++ b/datasource/mongo/sd/listwatch_inner.go
@@ -26,15 +26,16 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/apache/servicecomb-service-center/datasource/mongo/client"
-
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
"github.com/apache/servicecomb-service-center/datasource/sdcommon"
"github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/pkg/util"
)
+type parsefunc func(doc bson.Raw) (resource sdcommon.Resource)
+
type mongoListWatch struct {
Key string
resumeToken bson.Raw
+ parseFunc parsefunc
}
func (lw *mongoListWatch) List(op sdcommon.ListWatchConfig)
(*sdcommon.ListWatchResp, error) {
@@ -52,7 +53,7 @@ func (lw *mongoListWatch) List(op sdcommon.ListWatchConfig)
(*sdcommon.ListWatch
lwRsp.Resources = make([]*sdcommon.Resource, 0)
for resp.Next(context.Background()) {
- info := lw.doParseDocumentToResource(resp.Current)
+ info := lw.parseFunc(resp.Current)
lwRsp.Resources = append(lwRsp.Resources, &info)
}
@@ -118,53 +119,6 @@ func (lw *mongoListWatch) DoWatch(ctx context.Context, f
func(*sdcommon.ListWatc
return err
}
-func (lw *mongoListWatch) doParseDocumentToResource(fullDocument bson.Raw)
(resource sdcommon.Resource) {
- var err error
-
- documentID := MongoDocument{}
- err = bson.Unmarshal(fullDocument, &documentID)
- if err != nil {
- return
- }
-
- resource.DocumentID = documentID.ID.Hex()
-
- switch lw.Key {
- case instance:
- instance := model.Instance{}
- err = bson.Unmarshal(fullDocument, &instance)
- if err != nil {
- log.Error("error to parse bson raw to documentInfo",
err)
- return
- }
- if instance.Instance == nil {
- log.Error(fmt.Sprintf("unexpect instance value,the
documentID is %s", resource.DocumentID), nil)
- return
- }
- resource.Key = instance.Instance.InstanceId
- resource.Value = instance
- resource.Index = instance.Instance.ServiceId
- case service:
- service := model.Service{}
- err := bson.Unmarshal(fullDocument, &service)
- if err != nil {
- log.Error("error to parse bson raw to documentInfo",
err)
- return
- }
- if service.Service == nil {
- log.Error(fmt.Sprintf("unexpect service value,the
documentID is %s", resource.DocumentID), nil)
- return
- }
- resource.Key = service.Service.ServiceId
- resource.Value = service
- resource.Index = util.StringJoin([]string{service.Domain,
service.Project, service.Service.ServiceName, service.Service.Version,
service.Service.AppId, service.Service.Environment}, "/")
- default:
- return
- }
-
- return
-}
-
func (lw *mongoListWatch) ResumeToken() bson.Raw {
return lw.resumeToken
}
@@ -177,10 +131,10 @@ func (lw *mongoListWatch) doParseWatchRspToResource(wRsp
*MongoWatchResponse) (r
switch wRsp.OperationType {
case deleteOp:
//delete operation has no fullDocumentValue
- resource.DocumentID = wRsp.DocumentKey.ID.Hex()
+ resource.Key = wRsp.DocumentKey.ID.Hex()
return
case insertOp, updateOp, replaceOp:
- return lw.doParseDocumentToResource(wRsp.FullDocument)
+ return lw.parseFunc(wRsp.FullDocument)
default:
log.Warn(fmt.Sprintf("unrecognized operation:%s",
wRsp.OperationType))
}
diff --git a/datasource/mongo/sd/listwatch_test.go
b/datasource/mongo/sd/listwatch_test.go
index f4e8232..a7a681c 100644
--- a/datasource/mongo/sd/listwatch_test.go
+++ b/datasource/mongo/sd/listwatch_test.go
@@ -61,23 +61,35 @@ func TestDoParseWatchRspToMongoInfo(t *testing.T) {
}
ilw := mongoListWatch{
Key: instance,
+ parseFunc: func(doc bson.Raw) (resource sdcommon.Resource) {
+ docID := MongoDocument{}
+ err := bson.Unmarshal(doc, &docID)
+ if err != nil {
+ return
+ }
+ service := model.Instance{}
+ err = bson.Unmarshal(doc, &service)
+ if err != nil {
+ return
+ }
+ resource.Value = service
+ resource.Key = docID.ID.Hex()
+ return
+ },
}
info := ilw.doParseWatchRspToResource(mockWatchRsp)
- assert.Equal(t, documentID.Hex(), info.DocumentID)
- assert.Equal(t, "8064a600438511eb8584fa163e8a81c9", info.Key)
+ assert.Equal(t, documentID.Hex(), info.Key)
// case updateOp
mockWatchRsp.OperationType = updateOp
info = ilw.doParseWatchRspToResource(mockWatchRsp)
- assert.Equal(t, documentID.Hex(), info.DocumentID)
- assert.Equal(t, "8064a600438511eb8584fa163e8a81c9", info.Key)
+ assert.Equal(t, documentID.Hex(), info.Key)
assert.Equal(t, "1608552622",
info.Value.(model.Instance).Instance.ModTimestamp)
// case delete
mockWatchRsp.OperationType = deleteOp
info = ilw.doParseWatchRspToResource(mockWatchRsp)
- assert.Equal(t, documentID.Hex(), info.DocumentID)
- assert.Equal(t, "", info.Key)
+ assert.Equal(t, documentID.Hex(), info.Key)
// case service insertOp
mockWatchRsp = &MongoWatchResponse{OperationType: insertOp,
@@ -86,8 +98,7 @@ func TestDoParseWatchRspToMongoInfo(t *testing.T) {
}
ilw.Key = service
info = ilw.doParseWatchRspToResource(mockWatchRsp)
- assert.Equal(t, documentID.Hex(), info.DocumentID)
- assert.Equal(t, "91afbe0faa9dc1594689139f099eb293b0cd048d", info.Key)
+ assert.Equal(t, documentID.Hex(), info.Key)
}
func TestInnerListWatch_ResumeToken(t *testing.T) {
diff --git a/datasource/mongo/sd/mongo_cache.go
b/datasource/mongo/sd/mongo_cache.go
index 1b6a895..7395d4e 100644
--- a/datasource/mongo/sd/mongo_cache.go
+++ b/datasource/mongo/sd/mongo_cache.go
@@ -17,170 +17,27 @@
package sd
-import (
- "github.com/patrickmn/go-cache"
-)
-
-// MongoCache implements Cache.
-// MongoCache is dedicated to stores service discovery data,
-// e.g. service, instance, lease.
-// the docStore consists of two parts.
-// 1. documentID --> bussinessID
-// 2. bussinessID --> documentID
-// the store consists of two parts.
-// 1. index --> bussinessID list
-// 2. bussinessID --> index
-type MongoCache struct {
- Options *Options
- name string
- store *cache.Cache
- docStore *cache.Cache
- indexStore *cache.Cache
- dirty bool
-}
-
-func (c *MongoCache) Name() string {
- return c.name
-}
-
-func (c *MongoCache) Size() (l int) {
- return c.store.ItemCount()
-}
-
-func (c *MongoCache) Get(id string) (v interface{}) {
- v, _ = c.store.Get(id)
- return
-}
-
-func (c *MongoCache) GetKeyByDocumentID(documentKey string) (id string) {
- if v, f := c.docStore.Get(documentKey); f {
- t, ok := v.(string)
- if ok {
- id = t
- }
- }
- return
-}
-
-func (c *MongoCache) GetDocumentIDByBussinessID(id string) (documentID string)
{
- v, f := c.docStore.Get(id)
- if f {
- if id, ok := v.(string); ok {
- documentID = id
- }
- }
- return
-}
-
-func (c *MongoCache) Put(id string, v interface{}) {
- c.store.Set(id, v, cache.NoExpiration)
-}
-
-func (c *MongoCache) PutDocumentID(id string, documentID string) {
- //store docID-->ID&ID-->docID
- c.docStore.Set(documentID, id, cache.NoExpiration)
- c.docStore.Set(id, documentID, cache.NoExpiration)
+type MongoCache interface {
+ MongoCacheReader
+ Dirty() bool
+ Clear()
+ MarkDirty()
+ ProcessUpdate(event MongoEvent)
+ ProcessDelete(event MongoEvent)
+ isValueNotUpdated(value interface{}, newValue interface{}) bool
}
-func (c *MongoCache) Remove(id string) {
- c.store.Delete(id)
- c.docStore.Delete(id)
- c.indexStore.Delete(id)
-}
-
-func (c *MongoCache) RemoveDocumentID(documentID string) {
- c.docStore.Delete(documentID)
-}
-
-func (c *MongoCache) GetIndexData(index string) (res []string) {
- if p, found := c.indexStore.Get(index); found {
- res, ok := p.([]string)
- if ok {
- return res
- }
- }
- return
-}
+type MongoCacheReader interface {
+ Name() string // The name of implementation
-func (c *MongoCache) GetIndexByBussinessID(id string) (index string) {
- if v, found := c.indexStore.Get(id); found {
- if t, ok := v.(string); ok {
- index = t
- }
- }
- return
-}
+ Size() int // the bytes size of the cache
-func (c *MongoCache) PutIndex(index string, newID string) {
- v, found := c.indexStore.Get(index)
- if !found {
- c.indexStore.Set(index, []string{newID}, cache.NoExpiration)
- } else {
- if ids, ok := v.([]string); ok {
- for _, id := range ids {
- if id == newID {
- return
- }
- }
- ids = append(ids, newID)
- c.indexStore.Set(index, ids, cache.NoExpiration)
- }
- }
- //set id-->index for filterdelete
- c.indexStore.Set(newID, index, cache.NoExpiration)
-}
+ // Get gets a value by docid
+ Get(id string) interface{}
-func (c *MongoCache) RemoveIndex(index string, oldID string) {
- if v, found := c.indexStore.Get(index); found {
- ids, ok := v.([]string)
- if ok {
- var newIDs []string
- for _, id := range ids {
- if id == oldID {
- continue
- }
- newIDs = append(newIDs, id)
- }
- if len(newIDs) == 0 {
- c.indexStore.Delete(index)
- } else {
- c.indexStore.Set(index, newIDs,
cache.NoExpiration)
- }
- }
- }
-}
-
-func (c *MongoCache) MarkDirty() {
- c.dirty = true
-}
-
-func (c *MongoCache) Dirty() bool { return c.dirty }
-
-func (c *MongoCache) Clear() {
- c.dirty = false
- c.store.Flush()
- c.docStore.Flush()
- c.indexStore.Flush()
-}
-
-func (c *MongoCache) ForEach(iter func(k string, v interface{}) (next bool)) {
- items := c.store.Items()
- for k, v := range items {
- if v.Object == nil {
- continue
- }
- if !iter(k, v) {
- break
- }
- }
-}
+ // GetValue gets a value by index
+ GetValue(index string) []interface{}
-func NewMongoCache(name string, options *Options) *MongoCache {
- return &MongoCache{
- Options: options,
- name: name,
- store: cache.New(cache.NoExpiration, 0),
- docStore: cache.New(cache.NoExpiration, 0),
- indexStore: cache.New(cache.NoExpiration, 0),
- }
+ // ForEach executes the given function for each of the k-v
+ ForEach(iter func(k string, v interface{}) (next bool))
}
diff --git a/datasource/mongo/sd/mongo_cacher.go
b/datasource/mongo/sd/mongo_cacher.go
index 0faeeac..986d15f 100644
--- a/datasource/mongo/sd/mongo_cacher.go
+++ b/datasource/mongo/sd/mongo_cacher.go
@@ -26,7 +26,6 @@ import (
rmodel "github.com/go-chassis/cari/discovery"
-
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
"github.com/apache/servicecomb-service-center/datasource/sdcommon"
"github.com/apache/servicecomb-service-center/pkg/backoff"
"github.com/apache/servicecomb-service-center/pkg/gopool"
@@ -43,7 +42,7 @@ type MongoCacher struct {
Options *Options
reListCount int
isFirstTime bool
- cache *MongoCache
+ cache MongoCache
ready chan struct{}
lw sdcommon.ListWatch
mux sync.Mutex
@@ -51,7 +50,7 @@ type MongoCacher struct {
goroutine *gopool.Pool
}
-func (c *MongoCacher) Cache() *MongoCache {
+func (c *MongoCacher) Cache() MongoCache {
return c.cache
}
@@ -202,7 +201,6 @@ func (c *MongoCacher) handleEventBus(eventbus
*sdcommon.EventBus) error {
case sdcommon.ActionUpdate:
event = NewMongoEventByResource(resource,
rmodel.EVT_UPDATE)
case sdcommon.ActionDelete:
- resource.Key =
c.cache.GetKeyByDocumentID(resource.DocumentID)
resource.Value = c.cache.Get(resource.Key)
event = NewMongoEventByResource(resource,
rmodel.EVT_DELETE)
}
@@ -247,28 +245,21 @@ func (c *MongoCacher) sync(evts []MongoEvent) {
return
}
- c.onEvents(evts)
+ go c.onEvents(evts)
}
func (c *MongoCacher) filter(infos []*sdcommon.Resource) []MongoEvent {
nc := len(infos)
newStore := make(map[string]interface{}, nc)
- documentIDRecord := make(map[string]string, nc)
- indexRecord := make(map[string]string, nc)
-
for _, info := range infos {
- event := NewMongoEventByResource(info, rmodel.EVT_CREATE)
- newStore[event.ResourceID] = info.Value
- documentIDRecord[event.ResourceID] = info.DocumentID
- indexRecord[event.ResourceID] = info.Index
+ newStore[info.Key] = info.Value
}
filterStopCh := make(chan struct{})
eventsCh := make(chan [sdcommon.EventBlockSize]MongoEvent, 2)
-
go c.filterDelete(newStore, eventsCh, filterStopCh)
- go c.filterCreateOrUpdate(newStore, documentIDRecord, indexRecord,
eventsCh, filterStopCh)
+ go c.filterCreateOrUpdate(newStore, eventsCh, filterStopCh)
events := make([]MongoEvent, 0, nc)
for block := range eventsCh {
@@ -303,9 +294,7 @@ func (c *MongoCacher) filterDelete(newStore
map[string]interface{},
i = 0
}
- documentID := c.cache.GetDocumentIDByBussinessID(k)
- index := c.cache.GetIndexByBussinessID(k)
- block[i] = NewMongoEvent(k, documentID, index,
rmodel.EVT_DELETE, v)
+ block[i] = NewMongoEvent(k, rmodel.EVT_DELETE, v)
i++
return
})
@@ -317,8 +306,7 @@ func (c *MongoCacher) filterDelete(newStore
map[string]interface{},
close(filterStopCh)
}
-func (c *MongoCacher) filterCreateOrUpdate(newStore map[string]interface{},
newDocumentStore map[string]string, indexRecord map[string]string,
- eventsCh chan [sdcommon.EventBlockSize]MongoEvent, filterStopCh chan
struct{}) {
+func (c *MongoCacher) filterCreateOrUpdate(newStore map[string]interface{},
eventsCh chan [sdcommon.EventBlockSize]MongoEvent, filterStopCh chan struct{}) {
var block [sdcommon.EventBlockSize]MongoEvent
i := 0
@@ -331,13 +319,13 @@ func (c *MongoCacher) filterCreateOrUpdate(newStore
map[string]interface{}, newD
i = 0
}
- block[i] = NewMongoEvent(k, newDocumentStore[k],
indexRecord[k], rmodel.EVT_CREATE, v)
+ block[i] = NewMongoEvent(k, rmodel.EVT_CREATE, v)
i++
continue
}
- if c.isValueNotUpdated(v, ov) {
+ if c.cache.isValueNotUpdated(v, ov) {
continue
}
@@ -348,8 +336,7 @@ func (c *MongoCacher) filterCreateOrUpdate(newStore
map[string]interface{}, newD
block = [sdcommon.EventBlockSize]MongoEvent{}
i = 0
}
-
- block[i] = NewMongoEvent(k, newDocumentStore[k],
indexRecord[k], rmodel.EVT_UPDATE, v)
+ block[i] = NewMongoEvent(k, rmodel.EVT_UPDATE, v)
i++
}
@@ -362,36 +349,6 @@ func (c *MongoCacher) filterCreateOrUpdate(newStore
map[string]interface{}, newD
close(eventsCh)
}
-func (c *MongoCacher) isValueNotUpdated(value interface{}, newValue
interface{}) bool {
- var modTime string
- var newModTime string
-
- switch c.Options.Key {
- case instance:
- instance := value.(model.Instance)
- newInstance := newValue.(model.Instance)
- if instance.Instance == nil || newInstance.Instance == nil {
- return true
- }
- modTime = instance.Instance.ModTimestamp
- newModTime = newInstance.Instance.ModTimestamp
- case service:
- service := value.(model.Service)
- newService := newValue.(model.Service)
- if service.Service == nil || newService.Service == nil {
- return true
- }
- modTime = service.Service.ModTimestamp
- newModTime = newService.Service.ModTimestamp
- }
-
- if newModTime == "" || modTime == newModTime {
- return true
- }
-
- return false
-}
-
func (c *MongoCacher) onEvents(events []MongoEvent) {
c.buildCache(events)
@@ -400,7 +357,7 @@ func (c *MongoCacher) onEvents(events []MongoEvent) {
func (c *MongoCacher) buildCache(events []MongoEvent) {
for i, evt := range events {
- key := evt.ResourceID
+ key := evt.DocumentID
value := c.cache.Get(key)
ok := value != nil
@@ -418,11 +375,7 @@ func (c *MongoCacher) buildCache(events []MongoEvent) {
evt.Type, rmodel.EVT_UPDATE, key))
evt.Type = rmodel.EVT_UPDATE
}
-
- c.cache.Put(key, evt.Value)
- c.cache.PutDocumentID(key, evt.DocumentID)
- c.cache.PutIndex(evt.Index, evt.ResourceID)
-
+ c.cache.ProcessUpdate(evt)
events[i] = evt
case rmodel.EVT_DELETE:
if !ok {
@@ -430,10 +383,7 @@ func (c *MongoCacher) buildCache(events []MongoEvent) {
evt.Type, key))
} else {
evt.Value = value
-
- c.cache.Remove(key)
- c.cache.RemoveDocumentID(evt.DocumentID)
- c.cache.RemoveIndex(evt.Index, evt.ResourceID)
+ c.cache.ProcessDelete(evt)
}
events[i] = evt
}
@@ -451,21 +401,22 @@ func (c *MongoCacher) notify(evts []MongoEvent) {
for _, evt := range evts {
if evt.Type == rmodel.EVT_DELETE && evt.Value == nil {
- log.Warn(fmt.Sprintf("caught delete event:%s, but value
can't get from caches, it may be deleted by last list", evt.ResourceID))
+ log.Warn(fmt.Sprintf("caught delete event:%s, but value
can't get from caches, it may be deleted by last list", evt.DocumentID))
continue
}
eventProxy.OnEvent(evt)
}
}
-func NewMongoCacher(options *Options, cache *MongoCache) *MongoCacher {
+func NewMongoCacher(options *Options, cache MongoCache, pf parsefunc)
*MongoCacher {
return &MongoCacher{
Options: options,
isFirstTime: true,
cache: cache,
ready: make(chan struct{}),
lw: &mongoListWatch{
- Key: options.Key,
+ Key: options.Key,
+ parseFunc: pf,
},
goroutine: gopool.New(context.Background()),
}
diff --git a/datasource/mongo/sd/mongo_cacher_test.go
b/datasource/mongo/sd/mongo_cacher_test.go
deleted file mode 100644
index 6fc9870..0000000
--- a/datasource/mongo/sd/mongo_cacher_test.go
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package sd
-
-import (
- "context"
- "fmt"
- "testing"
-
- pb "github.com/go-chassis/cari/discovery"
- "github.com/stretchr/testify/assert"
- "go.mongodb.org/mongo-driver/bson"
-
-
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
- "github.com/apache/servicecomb-service-center/datasource/sdcommon"
- "github.com/apache/servicecomb-service-center/pkg/gopool"
-)
-
-type MockListWatch struct {
- ListResponse *sdcommon.ListWatchResp
- WatchResponse *sdcommon.ListWatchResp
- resumeToken bson.Raw
-}
-
-func (lw *MockListWatch) List(sdcommon.ListWatchConfig)
(*sdcommon.ListWatchResp, error) {
- if lw.ListResponse == nil {
- return nil, fmt.Errorf("list error")
- }
- return lw.ListResponse, nil
-}
-
-func (lw *MockListWatch) DoWatch(ctx context.Context, f
func(*sdcommon.ListWatchResp)) error {
- if lw.WatchResponse == nil {
- return fmt.Errorf("error")
- }
-
- f(lw.WatchResponse)
- <-ctx.Done()
- return nil
-}
-
-func (lw *MockListWatch) EventBus(op sdcommon.ListWatchConfig)
*sdcommon.EventBus {
- return sdcommon.NewEventBus(lw, op)
-}
-
-func TestNewMongoCacher(t *testing.T) {
- mockMongoCache := NewMongoCache("test", DefaultOptions())
- lw := &MockListWatch{}
-
- cr := &MongoCacher{
- Options: DefaultOptions(),
- ready: make(chan struct{}),
- isFirstTime: true,
- lw: lw,
- goroutine: gopool.New(context.Background()),
- cache: mockMongoCache,
- }
-
- ctx, cancel := context.WithCancel(context.Background())
- cancel()
-
- // case: cause list internal error before initialized
- t.Run("case list: internal error before initialized", func(t
*testing.T) {
- cr.refresh(ctx)
- if cr.IsReady() {
- t.Fatalf("TestNewKvCacher failed")
- }
-
- })
-
- // prepare mock data
- var evt MongoEvent
- cr = &MongoCacher{
- Options: DefaultOptions().SetTable(instance),
- ready: make(chan struct{}),
- lw: lw,
- goroutine: gopool.New(context.Background()),
- cache: mockMongoCache,
- }
-
- EventProxy(instance).AddHandleFunc(func(e MongoEvent) {
- evt = e
- })
-
- mockDocumentID := "5fcf2f1a4ea1e6d2f4c61d47"
- mockResourceID := "95cd8dbf3e8411eb92d8fa163e8a81c9"
- mockResumeToken, _ :=
- bson.Marshal(bson.M{"_data":
"825FDB4272000000012B022C0100296E5A10043E2D15AC82D9484C8090E68AF36FED2A46645F696400645FD76265066A6D2DF2AAC8D80004"})
-
- var resources []*sdcommon.Resource
- resource := &sdcommon.Resource{Key: mockResourceID, DocumentID:
mockDocumentID, Value: model.Instance{Domain: "default", Project: "default",
- Instance: &pb.MicroServiceInstance{InstanceId: mockResourceID,
ModTimestamp: "100000"}}}
- resources = append(resources, resource)
- test := &sdcommon.ListWatchResp{
- Action: sdcommon.ActionCreate,
- Resources: resources,
- }
-
- evtExpect := MongoEvent{
- DocumentID: mockDocumentID,
- ResourceID: mockResourceID,
- Value: resource.Value,
- Type: pb.EVT_INIT,
- }
-
- // case: list 1 resp and watch 0 event
- t.Run("case list: first time list init cache", func(t *testing.T) {
- // case: resume token is nil, first time list event is init
- cr.isFirstTime = true
- cr.cache.Remove(mockResourceID)
- cr.cache.RemoveDocumentID(mockDocumentID)
-
- lw.ListResponse = test
- lw.resumeToken = nil
- lw.WatchResponse = nil
-
- cr.refresh(ctx)
-
- // check ready
- assert.Equal(t, true, cr.IsReady())
-
- //check config
- assert.Equal(t, instance, cr.Options.Key)
-
- // check event
- assert.Equal(t, evtExpect, evt)
-
- // check cache
- cache := cr.cache.Get(mockResourceID)
- assert.Equal(t, resource.Value, cache)
- })
-
- t.Run("case list: re-list and updateOp cache", func(t *testing.T) {
- // case: re-list and should be no event
- lw.WatchResponse = nil
- evt.Value = nil
- cr.refresh(ctx)
-
- //check events
- assert.Nil(t, evt.Value)
-
- // check cache
- cache := cr.cache.Get(mockResourceID)
- assert.Equal(t, resource.Value, cache)
-
- // prepare updateOp data
- dataUpdate := &sdcommon.Resource{Key: mockResourceID,
DocumentID: mockDocumentID,
- Value: model.Instance{Domain: "default", Project:
"default",
- Instance: &pb.MicroServiceInstance{InstanceId:
mockResourceID, HostName: "test", ModTimestamp: "100001"}}}
-
- var mongoUpdateResources []*sdcommon.Resource
- mongoUpdateResources = append(mongoUpdateResources, dataUpdate)
- testUpdate := &sdcommon.ListWatchResp{
- Action: sdcommon.ActionUpdate,
- Resources: mongoUpdateResources,
- }
-
- lw.ListResponse = testUpdate
- lw.resumeToken = mockResumeToken
-
- // case: re-list and over no event times, and then event should
be updateOp
- for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
- lw.WatchResponse = nil
- }
-
- evt.Value = nil
- for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
- cr.refresh(ctx)
- }
- // check event
- evtExpect.Type = pb.EVT_UPDATE
- evtExpect.Value = dataUpdate.Value
- assert.Equal(t, evtExpect, evt)
-
- // check cache
- cache = cr.cache.Get(mockResourceID)
- assert.Equal(t, dataUpdate.Value, cache)
- })
-
- t.Run("case list: no infos list and delete cache", func(t *testing.T) {
- // case: no infos list, event should be delete
- lw.ListResponse = &sdcommon.ListWatchResp{}
- for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
- lw.WatchResponse = nil
- }
- evt.Value = nil
- for i := 0; i < sdcommon.DefaultForceListInterval; i++ {
- cr.refresh(ctx)
- }
-
- // check event
- evtExpect.Type = pb.EVT_DELETE
- assert.Equal(t, evtExpect, evt)
-
- // check cache
- cache := cr.cache.Get(mockResourceID)
- assert.Nil(t, nil, cache)
- })
-
- t.Run("case list: mark cache dirty and reset cache", func(t *testing.T)
{
- lw.ListResponse = test
- cr.isFirstTime = true
- evt.Value = nil
-
- cr.cache.MarkDirty()
- cr.refresh(ctx)
-
- // check event
- if evt.Value != nil {
- t.Fatalf("TestNewMongoCacher failed, %v", evt)
- }
-
- // check cache
- cache := cr.cache.Get(mockResourceID)
- assert.Equal(t, resource.Value, cache)
- })
-
- t.Run("case watch: caught create event", func(t *testing.T) {
- cr.cache.Remove(mockResourceID)
- cr.cache.RemoveDocumentID(mockDocumentID)
- lw.WatchResponse = test
- lw.ListResponse = nil
- lw.resumeToken = mockResumeToken
-
- cr.refresh(ctx)
-
- // check event
- evtExpect.Type = pb.EVT_CREATE
- evtExpect.Value = resource.Value
- assert.Equal(t, evtExpect, evt)
-
- // check cache
- cache := cr.cache.Get(mockResourceID)
- assert.Equal(t, resource.Value, cache)
-
- })
-
- t.Run("case watch: caught updateOp event", func(t *testing.T) {
- // prepare updateOp data
- dataUpdate := &sdcommon.Resource{Key: mockResourceID,
DocumentID: mockDocumentID,
- Value: model.Instance{Domain: "default", Project:
"default",
- Instance: &pb.MicroServiceInstance{InstanceId:
mockResourceID, HostName: "test", ModTimestamp: "100001"}}}
-
- var mongoUpdateResources []*sdcommon.Resource
- mongoUpdateResources = append(mongoUpdateResources, dataUpdate)
- testUpdate := &sdcommon.ListWatchResp{
- Action: sdcommon.ActionUpdate,
- Resources: mongoUpdateResources,
- }
- lw.WatchResponse = testUpdate
- lw.ListResponse = nil
-
- cr.refresh(ctx)
-
- // check event
- evtExpect.Type = pb.EVT_UPDATE
- evtExpect.Value = dataUpdate.Value
- assert.Equal(t, evtExpect, evt)
-
- // check cache
- cache := cr.cache.Get(mockResourceID)
- assert.Equal(t, dataUpdate.Value, cache)
- })
-
- t.Run("case watch: caught delete event but value is nil", func(t
*testing.T) {
- test.Action = sdcommon.ActionDelete
- lw.WatchResponse = test
- lw.ListResponse = nil
-
- cr.refresh(ctx)
-
- // check event
- evtExpect.Type = pb.EVT_DELETE
- assert.Equal(t, evtExpect, evt)
-
- // check cache
- cache := cr.cache.Get(mockResourceID)
- assert.Nil(t, cache)
-
- })
-}
-
-func TestMongoCacher_Run(t *testing.T) {
- lw := &MockListWatch{}
-
- mockMongoCache := NewMongoCache("test", DefaultOptions())
- cr := &MongoCacher{
- Options: DefaultOptions().SetTable(instance),
- ready: make(chan struct{}),
- lw: lw,
- goroutine: gopool.New(context.Background()),
- cache: mockMongoCache,
- }
-
- cr.Run()
-
- // check cache
- cache := cr.cache
- assert.NotNil(t, cache)
-
- cr.Stop()
-}
diff --git a/datasource/mongo/sd/options_test.go
b/datasource/mongo/sd/options_test.go
index 58fc401..7051d2a 100644
--- a/datasource/mongo/sd/options_test.go
+++ b/datasource/mongo/sd/options_test.go
@@ -52,7 +52,6 @@ var mongoEventFunc MongoEventFunc
func mongoEventFuncGet() MongoEventFunc {
fun := func(evt MongoEvent) {
evt.DocumentID = "DocumentID has changed"
- evt.ResourceID = "BusinessID has changed"
evt.Value = 2
evt.Type = discovery.EVT_UPDATE
log.Info("in event func")
diff --git a/datasource/mongo/sd/rule_cache.go
b/datasource/mongo/sd/rule_cache.go
new file mode 100644
index 0000000..5ccbe61
--- /dev/null
+++ b/datasource/mongo/sd/rule_cache.go
@@ -0,0 +1,124 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sd
+
+import (
+
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+ "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+ "go.mongodb.org/mongo-driver/bson"
+)
+
+type ruleStore struct {
+ dirty bool
+ d *DocStore
+ indexCache *indexCache
+}
+
+func init() {
+ RegisterCacher(rule, newRuleStore)
+}
+
+func newRuleStore() *MongoCacher {
+ options := DefaultOptions().SetTable(rule)
+ cache := &ruleStore{
+ dirty: false,
+ d: NewDocStore(),
+ indexCache: NewIndexCache(),
+ }
+ ruleUnmarshal := func(doc bson.Raw) (resource sdcommon.Resource) {
+ docID := MongoDocument{}
+ err := bson.Unmarshal(doc, &docID)
+ if err != nil {
+ return
+ }
+ rule := model.Rule{}
+ err = bson.Unmarshal(doc, &rule)
+ if err != nil {
+ return
+ }
+ resource.Value = rule
+ resource.Key = docID.ID.Hex()
+ return
+ }
+ return NewMongoCacher(options, cache, ruleUnmarshal)
+}
+
+func (s *ruleStore) Name() string {
+ return rule
+}
+
+func (s *ruleStore) Size() int {
+ return s.d.Size()
+}
+
+func (s *ruleStore) Get(key string) interface{} {
+ return s.d.Get(key)
+}
+
+func (s *ruleStore) ForEach(iter func(k string, v interface{}) (next bool)) {
+ s.d.ForEach(iter)
+}
+
+func (s *ruleStore) GetValue(index string) []interface{} {
+ docs := s.indexCache.Get(index)
+ res := make([]interface{}, 0, len(docs))
+ for _, v := range docs {
+ res = append(res, s.d.Get(v))
+ }
+ return res
+}
+
+func (s *ruleStore) Dirty() bool {
+ return s.dirty
+}
+
+func (s *ruleStore) MarkDirty() {
+ s.dirty = true
+}
+
+func (s *ruleStore) Clear() {
+ s.dirty = false
+ s.d.store.Flush()
+}
+
+func (s *ruleStore) ProcessUpdate(event MongoEvent) {
+ rule, ok := event.Value.(model.Rule)
+ if !ok {
+ return
+ }
+ s.d.Put(event.DocumentID, event.Value)
+ s.indexCache.Put(genRuleServiceID(rule), event.DocumentID)
+ return
+}
+
+func (s *ruleStore) ProcessDelete(event MongoEvent) {
+ rule, ok := s.d.Get(event.DocumentID).(model.Rule)
+ if !ok {
+ return
+ }
+ s.d.DeleteDoc(event.DocumentID)
+ s.indexCache.Delete(genRuleServiceID(rule), event.DocumentID)
+}
+
+func (s *ruleStore) isValueNotUpdated(value interface{}, newValue interface{})
bool {
+ return false
+}
+
+func genRuleServiceID(rule model.Rule) string {
+ return rule.ServiceID
+}
diff --git a/datasource/mongo/sd/service_cache.go
b/datasource/mongo/sd/service_cache.go
new file mode 100644
index 0000000..3aba3c0
--- /dev/null
+++ b/datasource/mongo/sd/service_cache.go
@@ -0,0 +1,133 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sd
+
+import (
+ "strings"
+
+
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+ "github.com/apache/servicecomb-service-center/datasource/sdcommon"
+ "go.mongodb.org/mongo-driver/bson"
+)
+
+type serviceStore struct {
+ dirty bool
+ docCache *DocStore
+ indexCache *indexCache
+}
+
+func init() {
+ RegisterCacher(service, newServiceStore)
+}
+
+func newServiceStore() *MongoCacher {
+ options := DefaultOptions().SetTable(service)
+ cache := &serviceStore{
+ dirty: false,
+ docCache: NewDocStore(),
+ indexCache: NewIndexCache(),
+ }
+ serviceUnmarshal := func(doc bson.Raw) (resource sdcommon.Resource) {
+ docID := MongoDocument{}
+ err := bson.Unmarshal(doc, &docID)
+ if err != nil {
+ return
+ }
+ service := model.Service{}
+ err = bson.Unmarshal(doc, &service)
+ if err != nil {
+ return
+ }
+ resource.Value = service
+ resource.Key = docID.ID.Hex()
+ return
+ }
+ return NewMongoCacher(options, cache, serviceUnmarshal)
+}
+
+func (s *serviceStore) Name() string {
+ return service
+}
+
+func (s *serviceStore) Size() int {
+ return s.docCache.Size()
+}
+
+func (s *serviceStore) Get(key string) interface{} {
+ return s.docCache.Get(key)
+}
+
+func (s *serviceStore) ForEach(iter func(k string, v interface{}) (next bool))
{
+ s.docCache.ForEach(iter)
+}
+
+func (s *serviceStore) GetValue(index string) []interface{} {
+ var docs []string
+ docs = s.indexCache.Get(index)
+ res := make([]interface{}, 0, len(docs))
+ for _, v := range docs {
+ res = append(res, s.docCache.Get(v))
+ }
+ return res
+}
+
+func (s *serviceStore) Dirty() bool {
+ return s.dirty
+}
+
+func (s *serviceStore) MarkDirty() {
+ s.dirty = true
+}
+
+func (s *serviceStore) Clear() {
+ s.dirty = false
+ s.docCache.store.Flush()
+}
+
+func (s *serviceStore) ProcessUpdate(event MongoEvent) {
+ service, ok := event.Value.(model.Service)
+ if !ok {
+ return
+ }
+ s.docCache.Put(event.DocumentID, event.Value)
+ s.indexCache.Put(genServiceID(service), event.DocumentID)
+ s.indexCache.Put(getServiceInfo(service), event.DocumentID)
+ return
+}
+
+func (s *serviceStore) ProcessDelete(event MongoEvent) {
+ service, ok := s.docCache.Get(event.DocumentID).(model.Service)
+ if !ok {
+ return
+ }
+ s.docCache.DeleteDoc(event.DocumentID)
+ s.indexCache.Delete(genServiceID(service), event.DocumentID)
+ s.indexCache.Delete(getServiceInfo(service), event.DocumentID)
+}
+
+func (s *serviceStore) isValueNotUpdated(value interface{}, newValue
interface{}) bool {
+ return false
+}
+
+func genServiceID(svc model.Service) string {
+ return svc.Service.ServiceId
+}
+
+func getServiceInfo(svc model.Service) string {
+ return strings.Join([]string{svc.Domain, svc.Project,
svc.Service.AppId, svc.Service.ServiceName, svc.Service.Version}, "/")
+}
diff --git a/datasource/mongo/sd/servicec_test.go
b/datasource/mongo/sd/servicec_test.go
new file mode 100644
index 0000000..f705ddc
--- /dev/null
+++ b/datasource/mongo/sd/servicec_test.go
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package sd
+
+import (
+ "testing"
+
+
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+ "github.com/go-chassis/cari/discovery"
+ "github.com/stretchr/testify/assert"
+)
+
+var serviceCache *MongoCacher
+
+var svc1 = model.Service{
+ Domain: "default",
+ Project: "default",
+ Tags: nil,
+ Service: &discovery.MicroService{
+ ServiceId: "123456789",
+ AppId: "appid1",
+ ServiceName: "svc1",
+ Version: "1.0",
+ },
+}
+
+var svc2 = model.Service{
+ Domain: "default",
+ Project: "default",
+ Tags: nil,
+ Service: &discovery.MicroService{
+ ServiceId: "987654321",
+ AppId: "appid1",
+ ServiceName: "svc1",
+ Version: "1.0",
+ },
+}
+
+func init() {
+ serviceCache = newServiceStore()
+}
+
+func TestServiceCacheBasicFunc(t *testing.T) {
+ t.Run("init serviceCache,should pass", func(t *testing.T) {
+ serviceCache := newServiceStore()
+ assert.NotNil(t, serviceCache)
+ assert.Equal(t, service, serviceCache.cache.Name())
+ })
+ event1 := MongoEvent{
+ DocumentID: "id1",
+ Value: svc1,
+ }
+ event2 := MongoEvent{
+ DocumentID: "id2",
+ Value: svc2,
+ }
+ t.Run("update&&delete serviceCache, should pass", func(t *testing.T) {
+ serviceCache.cache.ProcessUpdate(event1)
+ assert.Equal(t, serviceCache.cache.Size(), 1)
+ assert.Nil(t, serviceCache.cache.Get("id_not_exist"))
+ assert.Equal(t, svc1.Service.ServiceName,
serviceCache.cache.Get("id1").(model.Service).Service.ServiceName)
+ assert.Len(t,
serviceCache.cache.GetValue("default/default/appid1/svc1/1.0"), 1)
+ serviceCache.cache.ProcessUpdate(event2)
+ assert.Equal(t, serviceCache.cache.Size(), 2)
+ assert.Len(t,
serviceCache.cache.GetValue("default/default/appid1/svc1/1.0"), 2)
+ assert.Len(t, serviceCache.cache.GetValue("987654321"), 1)
+ assert.Len(t, serviceCache.cache.GetValue("123456789"), 1)
+ serviceCache.cache.ProcessDelete(event1)
+ assert.Nil(t, serviceCache.cache.Get("id1"))
+ assert.Len(t,
serviceCache.cache.GetValue("default/default/appid1/svc1/1.0"), 1)
+ serviceCache.cache.ProcessDelete(event2)
+ assert.Len(t,
serviceCache.cache.GetValue("default/default/appid1/svc1/1.0"), 0)
+ assert.Nil(t, serviceCache.cache.Get("id2"))
+ assert.Len(t, serviceCache.cache.GetValue("987654321"), 0)
+ assert.Len(t, serviceCache.cache.GetValue("123456789"), 0)
+ })
+}
diff --git a/datasource/mongo/sd/types.go b/datasource/mongo/sd/types.go
index fe4b21e..a244910 100644
--- a/datasource/mongo/sd/types.go
+++ b/datasource/mongo/sd/types.go
@@ -21,6 +21,7 @@ import (
"github.com/go-chassis/cari/discovery"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
+ "sync"
"github.com/apache/servicecomb-service-center/datasource/sdcommon"
)
@@ -28,20 +29,26 @@ import (
const (
service = "service"
instance = "instance"
+ rule = "rule"
+ dep = "dependency"
)
-var (
- Types []string
-)
+type cacherRegisterInitiallizer func() (cacher *MongoCacher)
+
+var cacherRegisterMutex sync.Mutex
+var CacherRegister = make(map[string]cacherRegisterInitiallizer)
-func RegisterType(name string) {
- Types = append(Types, name)
+func RegisterCacher(t string, f cacherRegisterInitiallizer) {
+ cacherRegisterMutex.Lock()
+ defer cacherRegisterMutex.Unlock()
+ if _, exist := CacherRegister[t]; exist {
+ return
+ }
+ CacherRegister[t] = f
}
type MongoEvent struct {
DocumentID string
- ResourceID string
- Index string
Value interface{}
Type discovery.EventType
}
@@ -56,18 +63,14 @@ type MongoEventHandler interface {
func NewMongoEventByResource(resource *sdcommon.Resource, action
discovery.EventType) MongoEvent {
return MongoEvent{
Type: action,
- Index: resource.Index,
Value: resource.Value,
- ResourceID: resource.Key,
- DocumentID: resource.DocumentID,
+ DocumentID: resource.Key,
}
}
-func NewMongoEvent(id string, documentID string, index string, action
discovery.EventType, v interface{}) MongoEvent {
+func NewMongoEvent(documentID string, action discovery.EventType, v
interface{}) MongoEvent {
event := MongoEvent{}
- event.ResourceID = id
event.DocumentID = documentID
- event.Index = index
event.Type = action
event.Value = v
return event
diff --git a/datasource/mongo/sd/typestore.go b/datasource/mongo/sd/typestore.go
index 806e6b2..5997d46 100644
--- a/datasource/mongo/sd/typestore.go
+++ b/datasource/mongo/sd/typestore.go
@@ -33,7 +33,6 @@ var store = &TypeStore{}
func init() {
store.Initialize()
- registerInnerTypes()
}
type TypeStore struct {
@@ -48,11 +47,6 @@ func (s *TypeStore) Initialize() {
s.goroutine = gopool.New(context.Background())
}
-func registerInnerTypes() {
- RegisterType(service)
- RegisterType(instance)
-}
-
func (s *TypeStore) Run() {
s.goroutine.Do(s.store)
s.goroutine.Do(s.autoClearCache)
@@ -60,7 +54,7 @@ func (s *TypeStore) Run() {
func (s *TypeStore) store(ctx context.Context) {
// new all types
- for _, t := range Types {
+ for t, _ := range CacherRegister {
select {
case <-ctx.Done():
return
@@ -84,7 +78,7 @@ func (s *TypeStore) autoClearCache(ctx context.Context) {
case <-ctx.Done():
return
case <-time.After(ttl):
- for _, t := range Types {
+ for t, _ := range CacherRegister {
cache := s.getOrCreateCache(t).Cache()
cache.MarkDirty()
}
@@ -98,15 +92,18 @@ func (s *TypeStore) getOrCreateCache(t string) *MongoCacher
{
if ok {
return cache.(*MongoCacher)
}
-
- options := DefaultOptions().SetTable(t)
-
- cache = NewMongoCache(t, options)
- cacher := NewMongoCacher(options, cache.(*MongoCache))
+ f, ok := CacherRegister[t]
+ if !ok {
+ log.Fatal(fmt.Sprintf("unexpected type store "+t), nil)
+ }
+ cacher := f()
cacher.Run()
s.caches.Put(t, cacher)
return cacher
+
+ s.caches.Put(t, cacher)
+ return cacher
}
func (s *TypeStore) Stop() {
@@ -129,6 +126,8 @@ func (s *TypeStore) Ready() <-chan struct{} {
func (s *TypeStore) TypeCacher(id string) *MongoCacher { return
s.getOrCreateCache(id) }
func (s *TypeStore) Service() *MongoCacher { return
s.TypeCacher(service) }
func (s *TypeStore) Instance() *MongoCacher { return
s.TypeCacher(instance) }
+func (s *TypeStore) Rule() *MongoCacher { return
s.TypeCacher(rule) }
+func (s *TypeStore) Dep() *MongoCacher { return
s.TypeCacher(dep) }
func Store() *TypeStore {
return store
diff --git a/datasource/sdcommon/types.go b/datasource/sdcommon/types.go
index f8db7b6..120f547 100644
--- a/datasource/sdcommon/types.go
+++ b/datasource/sdcommon/types.go
@@ -57,8 +57,6 @@ type ListWatchResp struct {
type Resource struct {
// Key in etcd is prefix, in mongo is resourceId
Key string
- // DocumentID is only for mongo
- DocumentID string
// Index is the index of the resource
Index string