This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f44cb4   [SCB-2094] cache support get instances (#791)
4f44cb4 is described below

commit 4f44cb402e62852715c5ead8df9447ae9b0a22a9
Author: panqian <[email protected]>
AuthorDate: Thu Dec 31 17:19:18 2020 +0800

     [SCB-2094] cache support get instances (#791)
---
 datasource/mongo/ms.go                 |  66 ++++++++++----
 datasource/mongo/sd/listwatch_inner.go |   3 +
 datasource/mongo/sd/mongo_cache.go     | 157 +++++++++++++++++++++------------
 datasource/mongo/sd/mongo_cacher.go    |  19 ++--
 datasource/mongo/sd/types.go           |  12 +--
 datasource/sdcommon/common.go          |   2 +-
 datasource/sdcommon/types.go           |   2 +
 7 files changed, 172 insertions(+), 89 deletions(-)

diff --git a/datasource/mongo/ms.go b/datasource/mongo/ms.go
index 3ecc494..6911e66 100644
--- a/datasource/mongo/ms.go
+++ b/datasource/mongo/ms.go
@@ -27,6 +27,7 @@ import (
        "strings"
        "time"
 
+       "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
        "github.com/apache/servicecomb-service-center/server/plugin/quota"
        "go.mongodb.org/mongo-driver/mongo"
 
@@ -170,7 +171,7 @@ func (ds *DataSource) GetApplications(ctx context.Context, 
request *discovery.Ge
 
 func (ds *DataSource) GetService(ctx context.Context, request 
*discovery.GetServiceRequest) (
        *discovery.GetServiceResponse, error) {
-       svc, err := GetService(ctx, GeneratorServiceFilter(ctx, 
request.ServiceId))
+       svc, err := GetServiceByID(ctx, request.ServiceId)
        if err != nil {
                log.Error(fmt.Sprintf("failed to get single service %s from 
mongo", request.ServiceId), err)
                return &discovery.GetServiceResponse{
@@ -1601,14 +1602,11 @@ func (ds *DataSource) GetInstance(ctx context.Context, 
request *discovery.GetOne
 }
 
 func (ds *DataSource) GetInstances(ctx context.Context, request 
*discovery.GetInstancesRequest) (*discovery.GetInstancesResponse, error) {
-       domainProject := util.ParseDomainProject(ctx)
        service := &Service{}
        var err error
-       var serviceIDs []string
 
        if len(request.ConsumerServiceId) > 0 {
-               filter := GeneratorServiceFilter(ctx, request.ConsumerServiceId)
-               service, err = GetService(ctx, filter)
+               service, err = GetServiceByID(ctx, request.ConsumerServiceId)
                if err != nil {
                        log.Error(fmt.Sprintf("get consumer failed, consumer %s 
find provider %s instances",
                                request.ConsumerServiceId, 
request.ProviderServiceId), err)
@@ -1626,8 +1624,7 @@ func (ds *DataSource) GetInstances(ctx context.Context, 
request *discovery.GetIn
                }
        }
 
-       filter := GeneratorServiceFilter(ctx, request.ProviderServiceId)
-       provider, err := GetService(ctx, filter)
+       provider, err := GetServiceByID(ctx, request.ProviderServiceId)
        if err != nil {
                log.Error(fmt.Sprintf("get provider failed, consumer %s find 
provider instances %s",
                        request.ConsumerServiceId, request.ProviderServiceId), 
err)
@@ -1650,24 +1647,15 @@ func (ds *DataSource) GetInstances(ctx context.Context, 
request *discovery.GetIn
                        provider.ServiceInfo.ServiceId, 
provider.ServiceInfo.Environment, provider.ServiceInfo.AppId, 
provider.ServiceInfo.ServiceName, provider.ServiceInfo.Version)
        }
 
-       services, err := findServices(ctx, 
discovery.MicroServiceToKey(domainProject, provider.ServiceInfo))
-       if err != nil {
-               log.Error(fmt.Sprintf("get instances failed %s", findFlag()), 
err)
-               return &discovery.GetInstancesResponse{
-                       Response: 
discovery.CreateResponse(discovery.ErrInternal, err.Error()),
-               }, err
-       }
-       if services != nil {
-               serviceIDs = filterServiceIDs(ctx, request.ConsumerServiceId, 
request.Tags, services)
-       }
-       if services == nil || len(serviceIDs) == 0 {
+       serviceIDs := filterServiceIDs(ctx, request.ConsumerServiceId, 
request.Tags, []*Service{provider})
+       if len(serviceIDs) == 0 {
                mes := fmt.Errorf("%s failed, provider does not exist", 
findFlag())
                log.Error("get instances failed", mes)
                return &discovery.GetInstancesResponse{
                        Response: 
discovery.CreateResponse(discovery.ErrServiceNotExists, mes.Error()),
                }, nil
        }
-       instances, err := instancesFilter(ctx, serviceIDs)
+       instances, err := GetInstancesByServiceID(ctx, 
request.ProviderServiceId)
        if err != nil {
                log.Error(fmt.Sprintf("get instances failed %s", findFlag()), 
err)
                return &discovery.GetInstancesResponse{
@@ -2821,3 +2809,43 @@ func GeneratorServiceInstanceFilter(ctx context.Context, 
serviceID string) bson.
                ColumnProject: project,
                StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}): 
serviceID}
 }
+
+func GetServiceByID(ctx context.Context, serviceID string) (*Service, error) {
+       cacheService, ok := 
sd.Store().Service().Cache().Get(serviceID).(sd.Service)
+       if !ok {
+               //no service in cache,get it from mongodb
+               return GetService(ctx, GeneratorServiceFilter(ctx, serviceID))
+       }
+       return cacheToService(cacheService), nil
+}
+
+func cacheToService(service sd.Service) *Service {
+       return &Service{
+               Domain:      service.Domain,
+               Project:     service.Project,
+               Tags:        service.Tags,
+               ServiceInfo: service.ServiceInfo,
+       }
+}
+
+func GetInstancesByServiceID(ctx context.Context, serviceID string) 
([]*discovery.MicroServiceInstance, error) {
+       var res []*discovery.MicroServiceInstance
+       var cacheUnavailable bool
+       cacheInstances := sd.Store().Instance().Cache().GetIndexData(serviceID)
+       for _, instID := range cacheInstances {
+               inst, ok := 
sd.Store().Instance().Cache().Get(instID).(sd.Instance)
+               if !ok {
+                       cacheUnavailable = true
+                       break
+               }
+               res = append(res, inst.InstanceInfo)
+       }
+       if cacheUnavailable {
+               res, err := instancesFilter(ctx, []string{serviceID})
+               if err != nil {
+                       return nil, err
+               }
+               return res, nil
+       }
+       return res, nil
+}
diff --git a/datasource/mongo/sd/listwatch_inner.go 
b/datasource/mongo/sd/listwatch_inner.go
index 2bee2a0..a95c103 100644
--- a/datasource/mongo/sd/listwatch_inner.go
+++ b/datasource/mongo/sd/listwatch_inner.go
@@ -24,6 +24,7 @@ import (
        "github.com/apache/servicecomb-service-center/datasource/mongo/client"
        "github.com/apache/servicecomb-service-center/datasource/sdcommon"
        "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/util"
        "go.mongodb.org/mongo-driver/bson"
        md "go.mongodb.org/mongo-driver/mongo"
        "go.mongodb.org/mongo-driver/mongo/options"
@@ -131,6 +132,7 @@ func (lw *mongoListWatch) 
doParseDocumentToResource(fullDocument bson.Raw) (reso
                }
                resource.Key = instance.InstanceInfo.InstanceId
                resource.Value = instance
+               resource.Index = instance.InstanceInfo.ServiceId
        case service:
                service := Service{}
                err := bson.Unmarshal(fullDocument, &service)
@@ -140,6 +142,7 @@ func (lw *mongoListWatch) 
doParseDocumentToResource(fullDocument bson.Raw) (reso
                }
                resource.Key = service.ServiceInfo.ServiceId
                resource.Value = service
+               resource.Index = util.StringJoin([]string{service.Domain, 
service.Project, service.ServiceInfo.ServiceName, service.ServiceInfo.Version, 
service.ServiceInfo.AppId, service.ServiceInfo.Environment}, "/")
        default:
                return
        }
diff --git a/datasource/mongo/sd/mongo_cache.go 
b/datasource/mongo/sd/mongo_cache.go
index 942a3b8..1b6a895 100644
--- a/datasource/mongo/sd/mongo_cache.go
+++ b/datasource/mongo/sd/mongo_cache.go
@@ -18,21 +18,25 @@
 package sd
 
 import (
-       "sync"
-
-       "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/patrickmn/go-cache"
 )
 
 // MongoCache implements Cache.
 // MongoCache is dedicated to stores service discovery data,
 // e.g. service, instance, lease.
+// the docStore consists of two parts.
+// 1. documentID --> bussinessID
+// 2. bussinessID --> documentID
+// the store consists of two parts.
+// 1. index --> bussinessID list
+// 2. bussinessID --> index
 type MongoCache struct {
-       Options       *Options
-       name          string
-       store         map[string]interface{}
-       documentStore map[string]string
-       rwMux         sync.RWMutex
-       dirty         bool
+       Options    *Options
+       name       string
+       store      *cache.Cache
+       docStore   *cache.Cache
+       indexStore *cache.Cache
+       dirty      bool
 }
 
 func (c *MongoCache) Name() string {
@@ -40,64 +44,110 @@ func (c *MongoCache) Name() string {
 }
 
 func (c *MongoCache) Size() (l int) {
-       c.rwMux.RLock()
-       l = int(util.Sizeof(c.store))
-       c.rwMux.RUnlock()
-       return
+       return c.store.ItemCount()
 }
 
 func (c *MongoCache) Get(id string) (v interface{}) {
-       c.rwMux.RLock()
-       if p, ok := c.store[id]; ok {
-               v = p
-       }
-       c.rwMux.RUnlock()
+       v, _ = c.store.Get(id)
        return
 }
 
 func (c *MongoCache) GetKeyByDocumentID(documentKey string) (id string) {
-       c.rwMux.RLock()
-       id = c.documentStore[documentKey]
-       c.rwMux.RUnlock()
+       if v, f := c.docStore.Get(documentKey); f {
+               t, ok := v.(string)
+               if ok {
+                       id = t
+               }
+       }
        return
 }
 
-func (c *MongoCache) GetDocumentIDByID(id string) (documentID string) {
-       c.rwMux.RLock()
-       for k, v := range c.documentStore {
-               if v == id {
-                       documentID = k
-                       break
+func (c *MongoCache) GetDocumentIDByBussinessID(id string) (documentID string) 
{
+       v, f := c.docStore.Get(id)
+       if f {
+               if id, ok := v.(string); ok {
+                       documentID = id
                }
        }
-       c.rwMux.RUnlock()
        return
 }
 
 func (c *MongoCache) Put(id string, v interface{}) {
-       c.rwMux.Lock()
-       c.store[id] = v
-       c.rwMux.Unlock()
+       c.store.Set(id, v, cache.NoExpiration)
 }
 
 func (c *MongoCache) PutDocumentID(id string, documentID string) {
-       c.rwMux.Lock()
-       c.documentStore[documentID] = id
-       c.rwMux.Unlock()
+       //store docID-->ID&ID-->docID
+       c.docStore.Set(documentID, id, cache.NoExpiration)
+       c.docStore.Set(id, documentID, cache.NoExpiration)
 }
 
 func (c *MongoCache) Remove(id string) {
-       c.rwMux.Lock()
-       delete(c.store, id)
-       c.rwMux.Unlock()
+       c.store.Delete(id)
+       c.docStore.Delete(id)
+       c.indexStore.Delete(id)
 }
 
 func (c *MongoCache) RemoveDocumentID(documentID string) {
-       c.rwMux.Lock()
+       c.docStore.Delete(documentID)
+}
 
-       delete(c.documentStore, documentID)
+func (c *MongoCache) GetIndexData(index string) (res []string) {
+       if p, found := c.indexStore.Get(index); found {
+               res, ok := p.([]string)
+               if ok {
+                       return res
+               }
+       }
+       return
+}
 
-       c.rwMux.Unlock()
+func (c *MongoCache) GetIndexByBussinessID(id string) (index string) {
+       if v, found := c.indexStore.Get(id); found {
+               if t, ok := v.(string); ok {
+                       index = t
+               }
+       }
+       return
+}
+
+func (c *MongoCache) PutIndex(index string, newID string) {
+       v, found := c.indexStore.Get(index)
+       if !found {
+               c.indexStore.Set(index, []string{newID}, cache.NoExpiration)
+       } else {
+               if ids, ok := v.([]string); ok {
+                       for _, id := range ids {
+                               if id == newID {
+                                       return
+                               }
+                       }
+                       ids = append(ids, newID)
+                       c.indexStore.Set(index, ids, cache.NoExpiration)
+               }
+       }
+       //set id-->index for filterdelete
+       c.indexStore.Set(newID, index, cache.NoExpiration)
+}
+
+func (c *MongoCache) RemoveIndex(index string, oldID string) {
+       if v, found := c.indexStore.Get(index); found {
+               ids, ok := v.([]string)
+               if ok {
+                       var newIDs []string
+                       for _, id := range ids {
+                               if id == oldID {
+                                       continue
+                               }
+                               newIDs = append(newIDs, id)
+                       }
+                       if len(newIDs) == 0 {
+                               c.indexStore.Delete(index)
+                       } else {
+                               c.indexStore.Set(index, newIDs, 
cache.NoExpiration)
+                       }
+               }
+       }
 }
 
 func (c *MongoCache) MarkDirty() {
@@ -107,31 +157,30 @@ func (c *MongoCache) MarkDirty() {
 func (c *MongoCache) Dirty() bool { return c.dirty }
 
 func (c *MongoCache) Clear() {
-       c.rwMux.Lock()
        c.dirty = false
-       c.store = make(map[string]interface{})
-       c.rwMux.Unlock()
+       c.store.Flush()
+       c.docStore.Flush()
+       c.indexStore.Flush()
 }
 
 func (c *MongoCache) ForEach(iter func(k string, v interface{}) (next bool)) {
-       c.rwMux.RLock()
-loopParent:
-       for k, v := range c.store {
-               if v == nil {
-                       continue loopParent
+       items := c.store.Items()
+       for k, v := range items {
+               if v.Object == nil {
+                       continue
                }
                if !iter(k, v) {
-                       break loopParent
+                       break
                }
        }
-       c.rwMux.RUnlock()
 }
 
 func NewMongoCache(name string, options *Options) *MongoCache {
        return &MongoCache{
-               Options:       options,
-               name:          name,
-               store:         make(map[string]interface{}),
-               documentStore: make(map[string]string),
+               Options:    options,
+               name:       name,
+               store:      cache.New(cache.NoExpiration, 0),
+               docStore:   cache.New(cache.NoExpiration, 0),
+               indexStore: cache.New(cache.NoExpiration, 0),
        }
 }
diff --git a/datasource/mongo/sd/mongo_cacher.go 
b/datasource/mongo/sd/mongo_cacher.go
index 57d28a6..891f274 100644
--- a/datasource/mongo/sd/mongo_cacher.go
+++ b/datasource/mongo/sd/mongo_cacher.go
@@ -21,10 +21,10 @@ import (
        "context"
        "errors"
        "fmt"
-       "github.com/apache/servicecomb-service-center/datasource/sdcommon"
        "sync"
        "time"
 
+       "github.com/apache/servicecomb-service-center/datasource/sdcommon"
        "github.com/apache/servicecomb-service-center/pkg/backoff"
        "github.com/apache/servicecomb-service-center/pkg/gopool"
        "github.com/apache/servicecomb-service-center/pkg/log"
@@ -252,11 +252,13 @@ func (c *MongoCacher) filter(infos []*sdcommon.Resource) 
[]MongoEvent {
        nc := len(infos)
        newStore := make(map[string]interface{}, nc)
        documentIDRecord := make(map[string]string, nc)
+       indexRecord := make(map[string]string, nc)
 
        for _, info := range infos {
                event := NewMongoEventByResource(info, rmodel.EVT_CREATE)
                newStore[event.ResourceID] = info.Value
                documentIDRecord[event.ResourceID] = info.DocumentID
+               indexRecord[event.ResourceID] = info.Index
        }
 
        filterStopCh := make(chan struct{})
@@ -264,7 +266,7 @@ func (c *MongoCacher) filter(infos []*sdcommon.Resource) 
[]MongoEvent {
 
        go c.filterDelete(newStore, eventsCh, filterStopCh)
 
-       go c.filterCreateOrUpdate(newStore, documentIDRecord, eventsCh, 
filterStopCh)
+       go c.filterCreateOrUpdate(newStore, documentIDRecord, indexRecord, 
eventsCh, filterStopCh)
 
        events := make([]MongoEvent, 0, nc)
        for block := range eventsCh {
@@ -299,8 +301,9 @@ func (c *MongoCacher) filterDelete(newStore 
map[string]interface{},
                        i = 0
                }
 
-               documentID := c.cache.GetDocumentIDByID(k)
-               block[i] = NewMongoEvent(k, documentID, rmodel.EVT_DELETE, v)
+               documentID := c.cache.GetDocumentIDByBussinessID(k)
+               index := c.cache.GetIndexByBussinessID(k)
+               block[i] = NewMongoEvent(k, documentID, index, 
rmodel.EVT_DELETE, v)
                i++
                return
        })
@@ -312,7 +315,7 @@ func (c *MongoCacher) filterDelete(newStore 
map[string]interface{},
        close(filterStopCh)
 }
 
-func (c *MongoCacher) filterCreateOrUpdate(newStore map[string]interface{}, 
newDocumentStore map[string]string,
+func (c *MongoCacher) filterCreateOrUpdate(newStore map[string]interface{}, 
newDocumentStore map[string]string, indexRecord map[string]string,
        eventsCh chan [sdcommon.EventBlockSize]MongoEvent, filterStopCh chan 
struct{}) {
        var block [sdcommon.EventBlockSize]MongoEvent
        i := 0
@@ -326,7 +329,7 @@ func (c *MongoCacher) filterCreateOrUpdate(newStore 
map[string]interface{}, newD
                                i = 0
                        }
 
-                       block[i] = NewMongoEvent(k, newDocumentStore[k], 
rmodel.EVT_CREATE, v)
+                       block[i] = NewMongoEvent(k, newDocumentStore[k], 
indexRecord[k], rmodel.EVT_CREATE, v)
                        i++
 
                        continue
@@ -344,7 +347,7 @@ func (c *MongoCacher) filterCreateOrUpdate(newStore 
map[string]interface{}, newD
                        i = 0
                }
 
-               block[i] = NewMongoEvent(k, newDocumentStore[k], 
rmodel.EVT_UPDATE, v)
+               block[i] = NewMongoEvent(k, newDocumentStore[k], 
indexRecord[k], rmodel.EVT_UPDATE, v)
                i++
        }
 
@@ -416,6 +419,7 @@ func (c *MongoCacher) buildCache(events []MongoEvent) {
 
                        c.cache.Put(key, evt.Value)
                        c.cache.PutDocumentID(key, evt.DocumentID)
+                       c.cache.PutIndex(evt.Index, evt.ResourceID)
 
                        events[i] = evt
                case rmodel.EVT_DELETE:
@@ -427,6 +431,7 @@ func (c *MongoCacher) buildCache(events []MongoEvent) {
 
                                c.cache.Remove(key)
                                c.cache.RemoveDocumentID(evt.DocumentID)
+                               c.cache.RemoveIndex(evt.Index, evt.ResourceID)
                        }
                        events[i] = evt
                }
diff --git a/datasource/mongo/sd/types.go b/datasource/mongo/sd/types.go
index 151bf17..881f27b 100644
--- a/datasource/mongo/sd/types.go
+++ b/datasource/mongo/sd/types.go
@@ -18,9 +18,9 @@
 package sd
 
 import (
-       "github.com/apache/servicecomb-service-center/datasource/sdcommon"
        "time"
 
+       "github.com/apache/servicecomb-service-center/datasource/sdcommon"
        "github.com/go-chassis/cari/discovery"
        pb "github.com/go-chassis/cari/discovery"
        "go.mongodb.org/mongo-driver/bson"
@@ -43,6 +43,7 @@ func RegisterType(name string) {
 type MongoEvent struct {
        DocumentID string
        ResourceID string
+       Index      string
        Value      interface{}
        Type       discovery.EventType
 }
@@ -58,10 +59,11 @@ func NewMongoEventByResource(resource *sdcommon.Resource, 
action discovery.Event
        return MongoEvent{ResourceID: resource.Key, DocumentID: 
resource.DocumentID, Value: resource.Value, Type: action}
 }
 
-func NewMongoEvent(id string, documentID string, action discovery.EventType, v 
interface{}) MongoEvent {
+func NewMongoEvent(id string, documentID string, index string, action 
discovery.EventType, v interface{}) MongoEvent {
        event := MongoEvent{}
        event.ResourceID = id
        event.DocumentID = documentID
+       event.Index = index
        event.Type = action
        event.Value = v
        return event
@@ -81,12 +83,6 @@ type ResumeToken struct {
        Data []byte `bson:"_data"`
 }
 
-type MongoInfo struct {
-       DocumentID string
-       ResourceID string
-       Value      interface{}
-}
-
 type Service struct {
        Domain      string
        Project     string
diff --git a/datasource/sdcommon/common.go b/datasource/sdcommon/common.go
index 0d58674..9666431 100644
--- a/datasource/sdcommon/common.go
+++ b/datasource/sdcommon/common.go
@@ -23,6 +23,6 @@ const (
        DefaultMetricsInterval   = 30 * time.Second
 
        MinWaitInterval = 1 * time.Second
-       EventBlockSize  = 1000
+       EventBlockSize  = 800
        EventBusSize    = 1000
 )
diff --git a/datasource/sdcommon/types.go b/datasource/sdcommon/types.go
index 93e641b..f8db7b6 100644
--- a/datasource/sdcommon/types.go
+++ b/datasource/sdcommon/types.go
@@ -60,6 +60,8 @@ type Resource struct {
        // DocumentID is only for mongo
        DocumentID string
 
+       // Index is the index of the resource
+       Index string
        // this is only for etcd
        CreateRevision int64
        ModRevision    int64

Reply via email to