This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e0f1fb  [merge]merge dev to master (#317)
3e0f1fb is described below

commit 3e0f1fb1770c4840f3bd9c52034a53ba9c64457d
Author: tornado-ssy <[email protected]>
AuthorDate: Thu Jan 25 21:15:24 2024 +0800

    [merge]merge dev to master (#317)
    
    * fix the concurrent bug of KvIdCache
    
    * fix the bug of do not report the error which occured in action of get 
kvdocs from etcd
    
    * fix the bug of do not report the error which occured in action of get 
kvdocs from etcd
    
    * resolve conflicts in master
    
    * [fix] fix golangci-lint (#318)
    
    Co-authored-by: songshiyuan 00649746 <[email protected]>
    
    * fix the concurrent bug of KvIdCache
    
    * fix the bug of do not report the error which occured in action of get 
kvdocs from etcd
    
    * fix the bug of do not report the error which occured in action of get 
kvdocs from etcd
    
    * resolve conflicts in master
    
    ---------
    
    Co-authored-by: songshiyuan 00649746 <[email protected]>
---
 server/datasource/etcd/kv/kv_cache.go | 62 ++++++++++++++++++++---------------
 server/datasource/etcd/kv/kv_dao.go   |  7 ++--
 2 files changed, 41 insertions(+), 28 deletions(-)

diff --git a/server/datasource/etcd/kv/kv_cache.go 
b/server/datasource/etcd/kv/kv_cache.go
index 61d017b..7cf6529 100644
--- a/server/datasource/etcd/kv/kv_cache.go
+++ b/server/datasource/etcd/kv/kv_cache.go
@@ -9,15 +9,16 @@ import (
        "sync"
        "time"
 
-       "github.com/apache/servicecomb-kie/pkg/model"
-       "github.com/apache/servicecomb-kie/pkg/stringutil"
-       "github.com/apache/servicecomb-kie/server/datasource"
-       "github.com/apache/servicecomb-kie/server/datasource/etcd/key"
        "github.com/go-chassis/foundation/backoff"
        "github.com/go-chassis/openlog"
        "github.com/little-cui/etcdadpt"
        goCache "github.com/patrickmn/go-cache"
        "go.etcd.io/etcd/api/v3/mvccpb"
+
+       "github.com/apache/servicecomb-kie/pkg/model"
+       "github.com/apache/servicecomb-kie/pkg/stringutil"
+       "github.com/apache/servicecomb-kie/server/datasource"
+       "github.com/apache/servicecomb-kie/server/datasource/etcd/key"
 )
 
 func Init() {
@@ -35,8 +36,6 @@ const (
        backOffMinInterval   = 5 * time.Second
 )
 
-type IDSet map[string]struct{}
-
 type Cache struct {
        timeOut    time.Duration
        client     etcdadpt.Client
@@ -158,11 +157,13 @@ func (kc *Cache) cachePut(rsp *etcdadpt.Response) {
                cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project, 
kvDoc.Labels)
                m, ok := kc.LoadKvIDSet(cacheKey)
                if !ok {
-                       kc.StoreKvIDSet(cacheKey, IDSet{kvDoc.ID: struct{}{}})
+                       z := &sync.Map{}
+                       z.Store(kvDoc.ID, struct{}{})
+                       kc.StoreKvIDSet(cacheKey, z)
                        openlog.Info("cacheKey " + cacheKey + "not exists")
                        continue
                }
-               m[kvDoc.ID] = struct{}{}
+               m.Store(kvDoc.ID, struct{}{})
        }
 }
 
@@ -180,23 +181,23 @@ func (kc *Cache) cacheDelete(rsp *etcdadpt.Response) {
                        openlog.Error("cacheKey " + cacheKey + "not exists")
                        continue
                }
-               delete(m, kvDoc.ID)
+               m.Delete(kvDoc.ID)
        }
 }
 
-func (kc *Cache) LoadKvIDSet(cacheKey string) (IDSet, bool) {
+func (kc *Cache) LoadKvIDSet(cacheKey string) (*sync.Map, bool) {
        val, ok := kc.kvIDCache.Load(cacheKey)
        if !ok {
                return nil, false
        }
-       kvIds, ok := val.(IDSet)
+       kvIds, ok := val.(*sync.Map)
        if !ok {
                return nil, false
        }
        return kvIds, true
 }
 
-func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds IDSet) {
+func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds *sync.Map) {
        kc.kvIDCache.Store(cacheKey, kvIds)
 }
 
@@ -220,9 +221,9 @@ func (kc *Cache) DeleteKvDoc(kvID string) {
        kc.kvDocCache.Delete(kvID)
 }
 
-func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, 
bool) {
+func Search(ctx context.Context, req *CacheSearchReq) (*model.KVResponse, 
bool, error) {
        if !req.Opts.ExactLabels {
-               return nil, false
+               return nil, false, nil
        }
 
        openlog.Debug(fmt.Sprintf("using cache to search kv, domain %v, project 
%v, opts %+v", req.Domain, req.Project, *req.Opts))
@@ -232,22 +233,25 @@ func Search(ctx context.Context, req *CacheSearchReq) 
(*model.KVResponse, bool)
        cacheKey := kvCache.GetCacheKey(req.Domain, req.Project, 
req.Opts.Labels)
        kvIds, ok := kvCache.LoadKvIDSet(cacheKey)
        if !ok {
-               kvCache.StoreKvIDSet(cacheKey, IDSet{})
-               return result, true
+               kvCache.StoreKvIDSet(cacheKey, &sync.Map{})
+               return result, true, nil
        }
 
        var docs []*model.KVDoc
 
        var kvIdsLeft []string
-       for kvID := range kvIds {
-               if doc, ok := kvCache.LoadKvDoc(kvID); ok {
+       kvIds.Range(func(kvID, value any) bool {
+               if doc, ok := kvCache.LoadKvDoc(kvID.(string)); ok {
                        docs = append(docs, doc)
-                       continue
+               } else {
+                       kvIdsLeft = append(kvIdsLeft, kvID.(string))
                }
-               kvIdsLeft = append(kvIdsLeft, kvID)
+               return true
+       })
+       tpData, err := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
+       if err != nil {
+               return nil, true, err
        }
-
-       tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
        docs = append(docs, tpData...)
 
        for _, doc := range docs {
@@ -257,17 +261,18 @@ func Search(ctx context.Context, req *CacheSearchReq) 
(*model.KVResponse, bool)
                }
        }
        result.Total = len(result.Data)
-       return result, true
+       return result, true, nil
 }
 
-func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, 
kvIdsLeft []string) []*model.KVDoc {
+func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq, 
kvIdsLeft []string) ([]*model.KVDoc, error) {
        if len(kvIdsLeft) == 0 {
-               return nil
+               return nil, nil
        }
 
        openlog.Debug("get kv from etcd by kvId")
        wg := sync.WaitGroup{}
        docs := make([]*model.KVDoc, len(kvIdsLeft))
+       var getKvErr error
        for i, kvID := range kvIdsLeft {
                wg.Add(1)
                go func(kvID string, cnt int) {
@@ -277,12 +282,14 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req 
*CacheSearchReq, kvIdsLe
                        kv, err := etcdadpt.Get(ctx, docKey)
                        if err != nil {
                                openlog.Error(fmt.Sprintf("failed to get kv 
from etcd, err %v", err))
+                               getKvErr = err
                                return
                        }
 
                        doc, err := kc.GetKvDoc(kv)
                        if err != nil {
                                openlog.Error(fmt.Sprintf("failed to unmarshal 
kv, err %v", err))
+                               getKvErr = err
                                return
                        }
 
@@ -291,7 +298,10 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req 
*CacheSearchReq, kvIdsLe
                }(kvID, i)
        }
        wg.Wait()
-       return docs
+       if getKvErr != nil {
+               return nil, getKvErr
+       }
+       return docs, nil
 }
 
 func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
diff --git a/server/datasource/etcd/kv/kv_dao.go 
b/server/datasource/etcd/kv/kv_dao.go
index 2332b58..d7260df 100644
--- a/server/datasource/etcd/kv/kv_dao.go
+++ b/server/datasource/etcd/kv/kv_dao.go
@@ -524,15 +524,18 @@ func (s *Dao) listData(ctx context.Context, project, 
domain string, options ...d
        }
 
        if Enabled() {
-               result, useCache := Search(ctx, &CacheSearchReq{
+               result, useCache, err := Search(ctx, &CacheSearchReq{
                        Domain:  domain,
                        Project: project,
                        Opts:    &opts,
                        Regex:   regex,
                })
-               if useCache {
+               if useCache && err == nil {
                        return result, opts, nil
                }
+               if useCache && err != nil {
+                       openlog.Error("using cache to search kv failed: " + 
err.Error())
+               }
        }
 
        result, err := matchLabelsSearch(ctx, domain, project, regex, opts)

Reply via email to