This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 4f44cb4 [SCB-2094] cache support get instances (#791)
4f44cb4 is described below
commit 4f44cb402e62852715c5ead8df9447ae9b0a22a9
Author: panqian <[email protected]>
AuthorDate: Thu Dec 31 17:19:18 2020 +0800
[SCB-2094] cache support get instances (#791)
---
datasource/mongo/ms.go | 66 ++++++++++----
datasource/mongo/sd/listwatch_inner.go | 3 +
datasource/mongo/sd/mongo_cache.go | 157 +++++++++++++++++++++------------
datasource/mongo/sd/mongo_cacher.go | 19 ++--
datasource/mongo/sd/types.go | 12 +--
datasource/sdcommon/common.go | 2 +-
datasource/sdcommon/types.go | 2 +
7 files changed, 172 insertions(+), 89 deletions(-)
diff --git a/datasource/mongo/ms.go b/datasource/mongo/ms.go
index 3ecc494..6911e66 100644
--- a/datasource/mongo/ms.go
+++ b/datasource/mongo/ms.go
@@ -27,6 +27,7 @@ import (
"strings"
"time"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
"github.com/apache/servicecomb-service-center/server/plugin/quota"
"go.mongodb.org/mongo-driver/mongo"
@@ -170,7 +171,7 @@ func (ds *DataSource) GetApplications(ctx context.Context,
request *discovery.Ge
func (ds *DataSource) GetService(ctx context.Context, request
*discovery.GetServiceRequest) (
*discovery.GetServiceResponse, error) {
- svc, err := GetService(ctx, GeneratorServiceFilter(ctx,
request.ServiceId))
+ svc, err := GetServiceByID(ctx, request.ServiceId)
if err != nil {
log.Error(fmt.Sprintf("failed to get single service %s from
mongo", request.ServiceId), err)
return &discovery.GetServiceResponse{
@@ -1601,14 +1602,11 @@ func (ds *DataSource) GetInstance(ctx context.Context,
request *discovery.GetOne
}
func (ds *DataSource) GetInstances(ctx context.Context, request
*discovery.GetInstancesRequest) (*discovery.GetInstancesResponse, error) {
- domainProject := util.ParseDomainProject(ctx)
service := &Service{}
var err error
- var serviceIDs []string
if len(request.ConsumerServiceId) > 0 {
- filter := GeneratorServiceFilter(ctx, request.ConsumerServiceId)
- service, err = GetService(ctx, filter)
+ service, err = GetServiceByID(ctx, request.ConsumerServiceId)
if err != nil {
log.Error(fmt.Sprintf("get consumer failed, consumer %s
find provider %s instances",
request.ConsumerServiceId,
request.ProviderServiceId), err)
@@ -1626,8 +1624,7 @@ func (ds *DataSource) GetInstances(ctx context.Context,
request *discovery.GetIn
}
}
- filter := GeneratorServiceFilter(ctx, request.ProviderServiceId)
- provider, err := GetService(ctx, filter)
+ provider, err := GetServiceByID(ctx, request.ProviderServiceId)
if err != nil {
log.Error(fmt.Sprintf("get provider failed, consumer %s find
provider instances %s",
request.ConsumerServiceId, request.ProviderServiceId),
err)
@@ -1650,24 +1647,15 @@ func (ds *DataSource) GetInstances(ctx context.Context,
request *discovery.GetIn
provider.ServiceInfo.ServiceId,
provider.ServiceInfo.Environment, provider.ServiceInfo.AppId,
provider.ServiceInfo.ServiceName, provider.ServiceInfo.Version)
}
- services, err := findServices(ctx,
discovery.MicroServiceToKey(domainProject, provider.ServiceInfo))
- if err != nil {
- log.Error(fmt.Sprintf("get instances failed %s", findFlag()),
err)
- return &discovery.GetInstancesResponse{
- Response:
discovery.CreateResponse(discovery.ErrInternal, err.Error()),
- }, err
- }
- if services != nil {
- serviceIDs = filterServiceIDs(ctx, request.ConsumerServiceId,
request.Tags, services)
- }
- if services == nil || len(serviceIDs) == 0 {
+ serviceIDs := filterServiceIDs(ctx, request.ConsumerServiceId,
request.Tags, []*Service{provider})
+ if len(serviceIDs) == 0 {
mes := fmt.Errorf("%s failed, provider does not exist",
findFlag())
log.Error("get instances failed", mes)
return &discovery.GetInstancesResponse{
Response:
discovery.CreateResponse(discovery.ErrServiceNotExists, mes.Error()),
}, nil
}
- instances, err := instancesFilter(ctx, serviceIDs)
+ instances, err := GetInstancesByServiceID(ctx,
request.ProviderServiceId)
if err != nil {
log.Error(fmt.Sprintf("get instances failed %s", findFlag()),
err)
return &discovery.GetInstancesResponse{
@@ -2821,3 +2809,43 @@ func GeneratorServiceInstanceFilter(ctx context.Context,
serviceID string) bson.
ColumnProject: project,
StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}):
serviceID}
}
+
+func GetServiceByID(ctx context.Context, serviceID string) (*Service, error) {
+ cacheService, ok :=
sd.Store().Service().Cache().Get(serviceID).(sd.Service)
+ if !ok {
+ //no service in cache,get it from mongodb
+ return GetService(ctx, GeneratorServiceFilter(ctx, serviceID))
+ }
+ return cacheToService(cacheService), nil
+}
+
+func cacheToService(service sd.Service) *Service {
+ return &Service{
+ Domain: service.Domain,
+ Project: service.Project,
+ Tags: service.Tags,
+ ServiceInfo: service.ServiceInfo,
+ }
+}
+
+func GetInstancesByServiceID(ctx context.Context, serviceID string)
([]*discovery.MicroServiceInstance, error) {
+ var res []*discovery.MicroServiceInstance
+ var cacheUnavailable bool
+ cacheInstances := sd.Store().Instance().Cache().GetIndexData(serviceID)
+ for _, instID := range cacheInstances {
+ inst, ok :=
sd.Store().Instance().Cache().Get(instID).(sd.Instance)
+ if !ok {
+ cacheUnavailable = true
+ break
+ }
+ res = append(res, inst.InstanceInfo)
+ }
+ if cacheUnavailable {
+ res, err := instancesFilter(ctx, []string{serviceID})
+ if err != nil {
+ return nil, err
+ }
+ return res, nil
+ }
+ return res, nil
+}
diff --git a/datasource/mongo/sd/listwatch_inner.go
b/datasource/mongo/sd/listwatch_inner.go
index 2bee2a0..a95c103 100644
--- a/datasource/mongo/sd/listwatch_inner.go
+++ b/datasource/mongo/sd/listwatch_inner.go
@@ -24,6 +24,7 @@ import (
"github.com/apache/servicecomb-service-center/datasource/mongo/client"
"github.com/apache/servicecomb-service-center/datasource/sdcommon"
"github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
"go.mongodb.org/mongo-driver/bson"
md "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
@@ -131,6 +132,7 @@ func (lw *mongoListWatch)
doParseDocumentToResource(fullDocument bson.Raw) (reso
}
resource.Key = instance.InstanceInfo.InstanceId
resource.Value = instance
+ resource.Index = instance.InstanceInfo.ServiceId
case service:
service := Service{}
err := bson.Unmarshal(fullDocument, &service)
@@ -140,6 +142,7 @@ func (lw *mongoListWatch)
doParseDocumentToResource(fullDocument bson.Raw) (reso
}
resource.Key = service.ServiceInfo.ServiceId
resource.Value = service
+ resource.Index = util.StringJoin([]string{service.Domain,
service.Project, service.ServiceInfo.ServiceName, service.ServiceInfo.Version,
service.ServiceInfo.AppId, service.ServiceInfo.Environment}, "/")
default:
return
}
diff --git a/datasource/mongo/sd/mongo_cache.go
b/datasource/mongo/sd/mongo_cache.go
index 942a3b8..1b6a895 100644
--- a/datasource/mongo/sd/mongo_cache.go
+++ b/datasource/mongo/sd/mongo_cache.go
@@ -18,21 +18,25 @@
package sd
import (
- "sync"
-
- "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/patrickmn/go-cache"
)
// MongoCache implements Cache.
// MongoCache is dedicated to stores service discovery data,
// e.g. service, instance, lease.
+// the docStore consists of two parts.
+// 1. documentID --> bussinessID
+// 2. bussinessID --> documentID
+// the store consists of two parts.
+// 1. index --> bussinessID list
+// 2. bussinessID --> index
type MongoCache struct {
- Options *Options
- name string
- store map[string]interface{}
- documentStore map[string]string
- rwMux sync.RWMutex
- dirty bool
+ Options *Options
+ name string
+ store *cache.Cache
+ docStore *cache.Cache
+ indexStore *cache.Cache
+ dirty bool
}
func (c *MongoCache) Name() string {
@@ -40,64 +44,110 @@ func (c *MongoCache) Name() string {
}
func (c *MongoCache) Size() (l int) {
- c.rwMux.RLock()
- l = int(util.Sizeof(c.store))
- c.rwMux.RUnlock()
- return
+ return c.store.ItemCount()
}
func (c *MongoCache) Get(id string) (v interface{}) {
- c.rwMux.RLock()
- if p, ok := c.store[id]; ok {
- v = p
- }
- c.rwMux.RUnlock()
+ v, _ = c.store.Get(id)
return
}
func (c *MongoCache) GetKeyByDocumentID(documentKey string) (id string) {
- c.rwMux.RLock()
- id = c.documentStore[documentKey]
- c.rwMux.RUnlock()
+ if v, f := c.docStore.Get(documentKey); f {
+ t, ok := v.(string)
+ if ok {
+ id = t
+ }
+ }
return
}
-func (c *MongoCache) GetDocumentIDByID(id string) (documentID string) {
- c.rwMux.RLock()
- for k, v := range c.documentStore {
- if v == id {
- documentID = k
- break
+func (c *MongoCache) GetDocumentIDByBussinessID(id string) (documentID string)
{
+ v, f := c.docStore.Get(id)
+ if f {
+ if id, ok := v.(string); ok {
+ documentID = id
}
}
- c.rwMux.RUnlock()
return
}
func (c *MongoCache) Put(id string, v interface{}) {
- c.rwMux.Lock()
- c.store[id] = v
- c.rwMux.Unlock()
+ c.store.Set(id, v, cache.NoExpiration)
}
func (c *MongoCache) PutDocumentID(id string, documentID string) {
- c.rwMux.Lock()
- c.documentStore[documentID] = id
- c.rwMux.Unlock()
+ //store docID-->ID&ID-->docID
+ c.docStore.Set(documentID, id, cache.NoExpiration)
+ c.docStore.Set(id, documentID, cache.NoExpiration)
}
func (c *MongoCache) Remove(id string) {
- c.rwMux.Lock()
- delete(c.store, id)
- c.rwMux.Unlock()
+ c.store.Delete(id)
+ c.docStore.Delete(id)
+ c.indexStore.Delete(id)
}
func (c *MongoCache) RemoveDocumentID(documentID string) {
- c.rwMux.Lock()
+ c.docStore.Delete(documentID)
+}
- delete(c.documentStore, documentID)
+func (c *MongoCache) GetIndexData(index string) (res []string) {
+ if p, found := c.indexStore.Get(index); found {
+ res, ok := p.([]string)
+ if ok {
+ return res
+ }
+ }
+ return
+}
- c.rwMux.Unlock()
+func (c *MongoCache) GetIndexByBussinessID(id string) (index string) {
+ if v, found := c.indexStore.Get(id); found {
+ if t, ok := v.(string); ok {
+ index = t
+ }
+ }
+ return
+}
+
+func (c *MongoCache) PutIndex(index string, newID string) {
+ v, found := c.indexStore.Get(index)
+ if !found {
+ c.indexStore.Set(index, []string{newID}, cache.NoExpiration)
+ } else {
+ if ids, ok := v.([]string); ok {
+ for _, id := range ids {
+ if id == newID {
+ return
+ }
+ }
+ ids = append(ids, newID)
+ c.indexStore.Set(index, ids, cache.NoExpiration)
+ }
+ }
+ //set id-->index for filterdelete
+ c.indexStore.Set(newID, index, cache.NoExpiration)
+}
+
+func (c *MongoCache) RemoveIndex(index string, oldID string) {
+ if v, found := c.indexStore.Get(index); found {
+ ids, ok := v.([]string)
+ if ok {
+ var newIDs []string
+ for _, id := range ids {
+ if id == oldID {
+ continue
+ }
+ newIDs = append(newIDs, id)
+ }
+ if len(newIDs) == 0 {
+ c.indexStore.Delete(index)
+ } else {
+ c.indexStore.Set(index, newIDs,
cache.NoExpiration)
+ }
+ }
+ }
}
func (c *MongoCache) MarkDirty() {
@@ -107,31 +157,30 @@ func (c *MongoCache) MarkDirty() {
func (c *MongoCache) Dirty() bool { return c.dirty }
func (c *MongoCache) Clear() {
- c.rwMux.Lock()
c.dirty = false
- c.store = make(map[string]interface{})
- c.rwMux.Unlock()
+ c.store.Flush()
+ c.docStore.Flush()
+ c.indexStore.Flush()
}
func (c *MongoCache) ForEach(iter func(k string, v interface{}) (next bool)) {
- c.rwMux.RLock()
-loopParent:
- for k, v := range c.store {
- if v == nil {
- continue loopParent
+ items := c.store.Items()
+ for k, v := range items {
+ if v.Object == nil {
+ continue
}
if !iter(k, v) {
- break loopParent
+ break
}
}
- c.rwMux.RUnlock()
}
func NewMongoCache(name string, options *Options) *MongoCache {
return &MongoCache{
- Options: options,
- name: name,
- store: make(map[string]interface{}),
- documentStore: make(map[string]string),
+ Options: options,
+ name: name,
+ store: cache.New(cache.NoExpiration, 0),
+ docStore: cache.New(cache.NoExpiration, 0),
+ indexStore: cache.New(cache.NoExpiration, 0),
}
}
diff --git a/datasource/mongo/sd/mongo_cacher.go
b/datasource/mongo/sd/mongo_cacher.go
index 57d28a6..891f274 100644
--- a/datasource/mongo/sd/mongo_cacher.go
+++ b/datasource/mongo/sd/mongo_cacher.go
@@ -21,10 +21,10 @@ import (
"context"
"errors"
"fmt"
- "github.com/apache/servicecomb-service-center/datasource/sdcommon"
"sync"
"time"
+ "github.com/apache/servicecomb-service-center/datasource/sdcommon"
"github.com/apache/servicecomb-service-center/pkg/backoff"
"github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
@@ -252,11 +252,13 @@ func (c *MongoCacher) filter(infos []*sdcommon.Resource)
[]MongoEvent {
nc := len(infos)
newStore := make(map[string]interface{}, nc)
documentIDRecord := make(map[string]string, nc)
+ indexRecord := make(map[string]string, nc)
for _, info := range infos {
event := NewMongoEventByResource(info, rmodel.EVT_CREATE)
newStore[event.ResourceID] = info.Value
documentIDRecord[event.ResourceID] = info.DocumentID
+ indexRecord[event.ResourceID] = info.Index
}
filterStopCh := make(chan struct{})
@@ -264,7 +266,7 @@ func (c *MongoCacher) filter(infos []*sdcommon.Resource)
[]MongoEvent {
go c.filterDelete(newStore, eventsCh, filterStopCh)
- go c.filterCreateOrUpdate(newStore, documentIDRecord, eventsCh,
filterStopCh)
+ go c.filterCreateOrUpdate(newStore, documentIDRecord, indexRecord,
eventsCh, filterStopCh)
events := make([]MongoEvent, 0, nc)
for block := range eventsCh {
@@ -299,8 +301,9 @@ func (c *MongoCacher) filterDelete(newStore
map[string]interface{},
i = 0
}
- documentID := c.cache.GetDocumentIDByID(k)
- block[i] = NewMongoEvent(k, documentID, rmodel.EVT_DELETE, v)
+ documentID := c.cache.GetDocumentIDByBussinessID(k)
+ index := c.cache.GetIndexByBussinessID(k)
+ block[i] = NewMongoEvent(k, documentID, index,
rmodel.EVT_DELETE, v)
i++
return
})
@@ -312,7 +315,7 @@ func (c *MongoCacher) filterDelete(newStore
map[string]interface{},
close(filterStopCh)
}
-func (c *MongoCacher) filterCreateOrUpdate(newStore map[string]interface{},
newDocumentStore map[string]string,
+func (c *MongoCacher) filterCreateOrUpdate(newStore map[string]interface{},
newDocumentStore map[string]string, indexRecord map[string]string,
eventsCh chan [sdcommon.EventBlockSize]MongoEvent, filterStopCh chan
struct{}) {
var block [sdcommon.EventBlockSize]MongoEvent
i := 0
@@ -326,7 +329,7 @@ func (c *MongoCacher) filterCreateOrUpdate(newStore
map[string]interface{}, newD
i = 0
}
- block[i] = NewMongoEvent(k, newDocumentStore[k],
rmodel.EVT_CREATE, v)
+ block[i] = NewMongoEvent(k, newDocumentStore[k],
indexRecord[k], rmodel.EVT_CREATE, v)
i++
continue
@@ -344,7 +347,7 @@ func (c *MongoCacher) filterCreateOrUpdate(newStore
map[string]interface{}, newD
i = 0
}
- block[i] = NewMongoEvent(k, newDocumentStore[k],
rmodel.EVT_UPDATE, v)
+ block[i] = NewMongoEvent(k, newDocumentStore[k],
indexRecord[k], rmodel.EVT_UPDATE, v)
i++
}
@@ -416,6 +419,7 @@ func (c *MongoCacher) buildCache(events []MongoEvent) {
c.cache.Put(key, evt.Value)
c.cache.PutDocumentID(key, evt.DocumentID)
+ c.cache.PutIndex(evt.Index, evt.ResourceID)
events[i] = evt
case rmodel.EVT_DELETE:
@@ -427,6 +431,7 @@ func (c *MongoCacher) buildCache(events []MongoEvent) {
c.cache.Remove(key)
c.cache.RemoveDocumentID(evt.DocumentID)
+ c.cache.RemoveIndex(evt.Index, evt.ResourceID)
}
events[i] = evt
}
diff --git a/datasource/mongo/sd/types.go b/datasource/mongo/sd/types.go
index 151bf17..881f27b 100644
--- a/datasource/mongo/sd/types.go
+++ b/datasource/mongo/sd/types.go
@@ -18,9 +18,9 @@
package sd
import (
- "github.com/apache/servicecomb-service-center/datasource/sdcommon"
"time"
+ "github.com/apache/servicecomb-service-center/datasource/sdcommon"
"github.com/go-chassis/cari/discovery"
pb "github.com/go-chassis/cari/discovery"
"go.mongodb.org/mongo-driver/bson"
@@ -43,6 +43,7 @@ func RegisterType(name string) {
type MongoEvent struct {
DocumentID string
ResourceID string
+ Index string
Value interface{}
Type discovery.EventType
}
@@ -58,10 +59,11 @@ func NewMongoEventByResource(resource *sdcommon.Resource,
action discovery.Event
return MongoEvent{ResourceID: resource.Key, DocumentID:
resource.DocumentID, Value: resource.Value, Type: action}
}
-func NewMongoEvent(id string, documentID string, action discovery.EventType, v
interface{}) MongoEvent {
+func NewMongoEvent(id string, documentID string, index string, action
discovery.EventType, v interface{}) MongoEvent {
event := MongoEvent{}
event.ResourceID = id
event.DocumentID = documentID
+ event.Index = index
event.Type = action
event.Value = v
return event
@@ -81,12 +83,6 @@ type ResumeToken struct {
Data []byte `bson:"_data"`
}
-type MongoInfo struct {
- DocumentID string
- ResourceID string
- Value interface{}
-}
-
type Service struct {
Domain string
Project string
diff --git a/datasource/sdcommon/common.go b/datasource/sdcommon/common.go
index 0d58674..9666431 100644
--- a/datasource/sdcommon/common.go
+++ b/datasource/sdcommon/common.go
@@ -23,6 +23,6 @@ const (
DefaultMetricsInterval = 30 * time.Second
MinWaitInterval = 1 * time.Second
- EventBlockSize = 1000
+ EventBlockSize = 800
EventBusSize = 1000
)
diff --git a/datasource/sdcommon/types.go b/datasource/sdcommon/types.go
index 93e641b..f8db7b6 100644
--- a/datasource/sdcommon/types.go
+++ b/datasource/sdcommon/types.go
@@ -60,6 +60,8 @@ type Resource struct {
// DocumentID is only for mongo
DocumentID string
+ // Index is the index of the resource
+ Index string
// this is only for etcd
CreateRevision int64
ModRevision int64