This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f0d306  optimize performance with cache (#212)
9f0d306 is described below

commit 9f0d306ae9e04b1ec5570de6db41ee6f7d768d73
Author: Shawn <[email protected]>
AuthorDate: Fri Aug 27 09:59:14 2021 +0800

    optimize performance with cache (#212)
---
 .github/workflows/golangci-lint.yml                |   2 +-
 cmd/kieserver/main.go                              |   3 +
 go.mod                                             |   2 +-
 go.sum                                             |   2 +
 pkg/model/kv.go                                    |   5 +-
 server/cache/long_polling.go                       |  43 ++++++++
 .../datasource/mongo/track/polling_detail_dao.go   |  10 +-
 server/pubsub/bus.go                               |  32 +++---
 server/pubsub/bus_test.go                          |  10 +-
 server/pubsub/event_handler.go                     |  94 ++++------------
 server/pubsub/notifier/kv.go                       | 119 +++++++++++++++++++++
 server/pubsub/struct.go                            |   4 +-
 server/pubsub/struct_test.go                       |   2 +-
 server/resource/v1/common.go                       |  34 ++++--
 server/resource/v1/history_resource_test.go        |   3 +-
 server/resource/v1/kv_resource.go                  |   4 +-
 server/service/kv/kv_svc.go                        |  25 +++--
 test/benchmark/watch.go                            |   3 +-
 test/init.go                                       |   1 +
 19 files changed, 269 insertions(+), 129 deletions(-)

diff --git a/.github/workflows/golangci-lint.yml 
b/.github/workflows/golangci-lint.yml
index 63aef9d..51166d6 100644
--- a/.github/workflows/golangci-lint.yml
+++ b/.github/workflows/golangci-lint.yml
@@ -16,7 +16,7 @@ jobs:
         uses: golangci/golangci-lint-action@v2
         with:
           version: v1.29
-          args: --skip-dirs=examples,test --skip-files=.*_test.go$
+          args: --enable gofmt,gocyclo,goimports,dupl,gosec 
--skip-dirs=examples,test --skip-files=.*_test.go$
   static-checks:
     runs-on: ubuntu-latest
     env:
diff --git a/cmd/kieserver/main.go b/cmd/kieserver/main.go
index 2fe226e..aa3228e 100644
--- a/cmd/kieserver/main.go
+++ b/cmd/kieserver/main.go
@@ -38,6 +38,9 @@ import (
        _ "github.com/apache/servicecomb-kie/server/plugin/qms"
        //noop cipher
        _ "github.com/go-chassis/go-chassis/v2/security/cipher/plugins/plain"
+
+       // event notifier
+       _ "github.com/apache/servicecomb-kie/server/pubsub/notifier"
 )
 
 func main() {
diff --git a/go.mod b/go.mod
index c8e0a93..c1d1160 100644
--- a/go.mod
+++ b/go.mod
@@ -8,9 +8,9 @@ require (
        github.com/go-chassis/go-chassis/v2 v2.2.1-0.20210810140748-7274d2228000
        github.com/go-chassis/openlog v1.1.3
        github.com/go-chassis/seclog v1.3.0
+       github.com/gofrs/uuid v4.0.0+incompatible
        github.com/hashicorp/serf v0.9.5
        github.com/little-cui/etcdadpt v0.1.2
-       github.com/satori/go.uuid v1.2.0
        github.com/stretchr/testify v1.7.0
        github.com/urfave/cli v1.22.4
        go.mongodb.org/mongo-driver v1.4.6
diff --git a/go.sum b/go.sum
index 3e7aeef..e044869 100644
--- a/go.sum
+++ b/go.sum
@@ -258,6 +258,8 @@ github.com/gobuffalo/packr/v2 v2.2.0 
h1:Ir9W9XIm9j7bhhkKE9cokvtTl1vBm62A/fene/ZC
 github.com/gobuffalo/packr/v2 v2.2.0/go.mod 
h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/VCm/3ptBN+0=
 github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754 
h1:tpom+2CJmpzAWj5/VEHync2rJGi+epHNIeRSWjzGA+4=
 github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod 
h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw=
+github.com/gofrs/uuid v4.0.0+incompatible 
h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw=
+github.com/gofrs/uuid v4.0.0+incompatible/go.mod 
h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
 github.com/gogo/googleapis v1.1.0 
h1:kFkMAZBNAn4j7K0GiZr8cRYzejq68VbheufiV3YuyFI=
 github.com/gogo/googleapis v1.1.0/go.mod 
h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
 github.com/gogo/protobuf v1.1.1/go.mod 
h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
diff --git a/pkg/model/kv.go b/pkg/model/kv.go
index ce3b422..f29186b 100644
--- a/pkg/model/kv.go
+++ b/pkg/model/kv.go
@@ -28,9 +28,8 @@ type KVRequest struct {
 
 //KVResponse represents the key value list
 type KVResponse struct {
-       LabelDoc *LabelDocResponse `json:"label,omitempty"`
-       Total    int               `json:"total"`
-       Data     []*KVDoc          `json:"data"`
+       Total int      `json:"total"`
+       Data  []*KVDoc `json:"data"`
 }
 
 //LabelDocResponse is label struct
diff --git a/server/cache/long_polling.go b/server/cache/long_polling.go
new file mode 100644
index 0000000..b1162ce
--- /dev/null
+++ b/server/cache/long_polling.go
@@ -0,0 +1,43 @@
+package cache
+
+import (
+       "sync"
+
+       "github.com/apache/servicecomb-kie/pkg/model"
+       "github.com/go-chassis/cari/pkg/errsvc"
+)
+
+var pollingCache = &LongPollingCache{}
+
+//LongPollingCache exchange space for time
+type LongPollingCache struct {
+       m sync.Map
+}
+type DBResult struct {
+       KVs *model.KVResponse
+       Err *errsvc.Error
+       Rev int64
+}
+
+func CachedKV() *LongPollingCache {
+       return pollingCache
+}
+
+//Read reads the cached query result
+//only need to filter by labels if match pattern is exact
+func (c *LongPollingCache) Read(topic string) (int64, *model.KVResponse, 
*errsvc.Error) {
+       value, ok := c.m.Load(topic)
+       if !ok {
+               return 0, nil, nil
+       }
+       t := value.(*DBResult)
+       if t.Err != nil {
+               return 0, nil, t.Err
+       }
+       return t.Rev, t.KVs, nil
+
+}
+
+func (c *LongPollingCache) Write(topic string, r *DBResult) {
+       c.m.Store(topic, r)
+}
diff --git a/server/datasource/mongo/track/polling_detail_dao.go 
b/server/datasource/mongo/track/polling_detail_dao.go
index 5a10590..5c51e67 100644
--- a/server/datasource/mongo/track/polling_detail_dao.go
+++ b/server/datasource/mongo/track/polling_detail_dao.go
@@ -24,7 +24,7 @@ import (
        "github.com/apache/servicecomb-kie/server/datasource"
        "github.com/apache/servicecomb-kie/server/datasource/mongo/session"
        "github.com/go-chassis/openlog"
-       "github.com/satori/go.uuid"
+       "github.com/gofrs/uuid"
        "go.mongodb.org/mongo-driver/bson"
        "go.mongodb.org/mongo-driver/mongo"
 )
@@ -41,8 +41,12 @@ func (s *Dao) CreateOrUpdate(ctx context.Context, detail 
*model.PollingDetail) (
        res := collection.FindOne(ctx, queryFilter)
        if res.Err() != nil {
                if res.Err() == mongo.ErrNoDocuments {
-                       detail.ID = uuid.NewV4().String()
-                       _, err := collection.InsertOne(ctx, detail)
+                       id, err := uuid.NewV4()
+                       if err != nil {
+                               return nil, err
+                       }
+                       detail.ID = id.String()
+                       _, err = collection.InsertOne(ctx, detail)
                        if err != nil {
                                return nil, err
                        }
diff --git a/server/pubsub/bus.go b/server/pubsub/bus.go
index b8b2b18..063a92a 100644
--- a/server/pubsub/bus.go
+++ b/server/pubsub/bus.go
@@ -22,6 +22,7 @@ import (
        "time"
 
        "encoding/json"
+
        "github.com/apache/servicecomb-kie/pkg/stringutil"
        "github.com/apache/servicecomb-kie/server/config"
        "github.com/go-chassis/openlog"
@@ -39,9 +40,12 @@ const (
        DefaultEventBatchInterval = 500 * time.Millisecond
 )
 
-var mutexObservers sync.RWMutex
 var topics sync.Map
 
+func Topics() *sync.Map {
+       return &topics
+}
+
 //Bus is message bug
 type Bus struct {
        agent *agent.Agent
@@ -87,12 +91,7 @@ func Start() {
                openlog.Fatal("can not sync key value change events to other 
kie nodes" + err.Error())
        }
        openlog.Info("kie message bus started")
-       eh := &EventHandler{
-               BatchInterval: DefaultEventBatchInterval,
-               BatchSize:     DefaultEventBatchSize,
-               Immediate:     true,
-       }
-       go eh.RunFlushTask()
+       eh := &ClusterEventHandler{}
        bus.agent.RegisterEventHandler(eh)
 }
 func join(addresses []string) error {
@@ -114,24 +113,23 @@ func Publish(event *KVChangeEvent) error {
 }
 
 //AddObserver observe key changes by (key or labels) or (key and labels)
-func AddObserver(o *Observer, topic *Topic) error {
+func AddObserver(o *Observer, topic *Topic) (string, error) {
        topic.LabelsFormat = stringutil.FormatMap(topic.Labels)
        b, err := json.Marshal(topic)
        if err != nil {
-               return err
+               return "", err
        }
        t := string(b)
        observers, ok := topics.Load(t)
        if !ok {
-               topics.Store(t, map[string]*Observer{
-                       o.UUID: o,
-               })
+               var observers = &sync.Map{}
+               observers.Store(o.UUID, o)
+               topics.Store(t, observers)
                openlog.Info("new topic:" + t)
-               return nil
+               return t, nil
        }
-       mutexObservers.Lock()
-       observers.(map[string]*Observer)[o.UUID] = o
-       mutexObservers.Unlock()
+       m := observers.(*sync.Map)
+       m.Store(o.UUID, o)
        openlog.Debug("add new observer for topic:" + t)
-       return nil
+       return t, nil
 }
diff --git a/server/pubsub/bus_test.go b/server/pubsub/bus_test.go
index 09cfd1b..5df3b34 100644
--- a/server/pubsub/bus_test.go
+++ b/server/pubsub/bus_test.go
@@ -20,21 +20,23 @@ package pubsub_test
 import (
        "testing"
 
+       _ "github.com/apache/servicecomb-kie/test"
+       "github.com/gofrs/uuid"
+
        "github.com/apache/servicecomb-kie/server/config"
        "github.com/apache/servicecomb-kie/server/pubsub"
-       "github.com/satori/go.uuid"
 )
 
 func TestInit(t *testing.T) {
        config.Configurations = &config.Config{}
        pubsub.Init()
        pubsub.Start()
-
+       id, _ := uuid.NewV4()
        o := &pubsub.Observer{
-               UUID:  uuid.NewV4().String(),
+               UUID:  id.String(),
                Event: make(chan *pubsub.KVChangeEvent, 1),
        }
-       _ = pubsub.AddObserver(o, &pubsub.Topic{
+       _, _ = pubsub.AddObserver(o, &pubsub.Topic{
                Project:  "1",
                DomainID: "2",
                Labels: map[string]string{
diff --git a/server/pubsub/event_handler.go b/server/pubsub/event_handler.go
index 1858b62..e6ecdb7 100644
--- a/server/pubsub/event_handler.go
+++ b/server/pubsub/event_handler.go
@@ -19,97 +19,39 @@ package pubsub
 
 import (
        "strings"
-       "sync"
-       "time"
 
        "github.com/go-chassis/openlog"
+       "github.com/hashicorp/serf/cmd/serf/command/agent"
        "github.com/hashicorp/serf/serf"
 )
 
-//EventHandler handler serf custom event, it is singleton
-type EventHandler struct {
-       BatchSize          int
-       BatchInterval      time.Duration
-       Immediate          bool
-       pendingEvents      sync.Map
-       pendingEventsCount int
+var handlers = make(map[string]agent.EventHandler)
+
+func RegisterHandler(typ string, h agent.EventHandler) {
+       handlers[typ] = h
+       openlog.Info("register handler for:" + typ)
+}
+
+//ClusterEventHandler handler serf custom event, it is singleton
+type ClusterEventHandler struct {
 }
 
 //HandleEvent send event to subscribers
-func (h *EventHandler) HandleEvent(e serf.Event) {
+func (h *ClusterEventHandler) HandleEvent(e serf.Event) {
        openlog.Debug("receive event:" + e.EventType().String())
        switch e.EventType().String() {
        case "user":
-               if strings.Contains(e.String(), EventKVChange) {
-                       h.handleKVEvent(e)
-               }
-       }
-
-}
-func (h *EventHandler) RunFlushTask() {
-       for {
-               if h.pendingEventsCount >= h.BatchSize {
-                       h.fireEvents()
-               }
-               <-time.After(h.BatchInterval)
-               h.fireEvents()
+               h.DispatchEvent(e)
        }
 
 }
-func (h *EventHandler) handleKVEvent(e serf.Event) {
-       ue := e.(serf.UserEvent)
-       ke, err := NewKVChangeEvent(ue.Payload)
-       if err != nil {
-               openlog.Error("invalid json:" + string(ue.Payload))
-       }
-       openlog.Debug("kv event:" + ke.Key)
-       if h.Immediate { //never retain event, not recommended
-               h.FindTopicAndFire(ke)
-       } else {
-               h.mergeAndSave(ke)
-       }
 
-}
-func (h *EventHandler) mergeAndSave(ke *KVChangeEvent) {
-       id := ke.String()
-       _, ok := h.pendingEvents.Load(id)
-       if ok {
-               openlog.Debug("ignore same event: " + id)
+func (h *ClusterEventHandler) DispatchEvent(e serf.Event) {
+       typ := strings.Replace(e.String(), "user-event: ", "", 1)
+       eh, ok := handlers[typ]
+       if !ok {
+               openlog.Warn("can not handle:" + typ)
                return
        }
-       h.pendingEvents.Store(id, ke)
-       h.pendingEventsCount++
-}
-func (h *EventHandler) fireEvents() {
-       h.pendingEvents.Range(func(key, value interface{}) bool {
-               ke := value.(*KVChangeEvent)
-               h.FindTopicAndFire(ke)
-               h.pendingEvents.Delete(key)
-               h.pendingEventsCount--
-               return true
-       })
-}
-
-func (h *EventHandler) FindTopicAndFire(ke *KVChangeEvent) {
-       topics.Range(func(key, value interface{}) bool { //range all topics
-               t, err := ParseTopicString(key.(string))
-               if err != nil {
-                       openlog.Error("can not parse topic " + key.(string) + 
": " + err.Error())
-                       return true
-               }
-               if t.Match(ke) {
-                       fireEvent(value, ke)
-               }
-               return true
-       })
-}
-
-func fireEvent(value interface{}, ke *KVChangeEvent) {
-       observers := value.(map[string]*Observer)
-       mutexObservers.Lock()
-       defer mutexObservers.Unlock()
-       for k, v := range observers {
-               v.Event <- ke
-               delete(observers, k)
-       }
+       eh.HandleEvent(e)
 }
diff --git a/server/pubsub/notifier/kv.go b/server/pubsub/notifier/kv.go
new file mode 100644
index 0000000..634f1c3
--- /dev/null
+++ b/server/pubsub/notifier/kv.go
@@ -0,0 +1,119 @@
+package notifier
+
+import (
+       "context"
+       "sync"
+       "time"
+
+       "github.com/apache/servicecomb-kie/pkg/model"
+       "github.com/apache/servicecomb-kie/server/cache"
+       "github.com/apache/servicecomb-kie/server/pubsub"
+       kvsvc "github.com/apache/servicecomb-kie/server/service/kv"
+       "github.com/go-chassis/openlog"
+       "github.com/hashicorp/serf/serf"
+)
+
+//KVHandler handler serf custom event, it is singleton
+type KVHandler struct {
+       BatchSize          int
+       BatchInterval      time.Duration
+       Immediate          bool
+       pendingEvents      sync.Map
+       pendingEventsCount int
+}
+
+func (h *KVHandler) RunFlushTask() {
+       for {
+               if h.pendingEventsCount >= h.BatchSize {
+                       h.fireEvents()
+               }
+               <-time.After(h.BatchInterval)
+               h.fireEvents()
+       }
+
+}
+func (h *KVHandler) HandleEvent(e serf.Event) {
+       ue := e.(serf.UserEvent)
+       ke, err := pubsub.NewKVChangeEvent(ue.Payload)
+       if err != nil {
+               openlog.Error("invalid json:" + string(ue.Payload))
+       }
+       openlog.Debug("kv event:" + ke.Key)
+       if h.Immediate { //never retain event, not recommended
+               h.FindTopicAndFire(ke)
+       } else {
+               h.mergeAndSave(ke)
+       }
+
+}
+func (h *KVHandler) mergeAndSave(ke *pubsub.KVChangeEvent) {
+       id := ke.String()
+       _, ok := h.pendingEvents.Load(id)
+       if ok {
+               openlog.Debug("ignore same event: " + id)
+               return
+       }
+       h.pendingEvents.Store(id, ke)
+       h.pendingEventsCount++
+}
+func (h *KVHandler) fireEvents() {
+       h.pendingEvents.Range(func(key, value interface{}) bool {
+               ke := value.(*pubsub.KVChangeEvent)
+               h.FindTopicAndFire(ke)
+               h.pendingEvents.Delete(key)
+               h.pendingEventsCount--
+               return true
+       })
+}
+
+func (h *KVHandler) FindTopicAndFire(ke *pubsub.KVChangeEvent) {
+       topic := pubsub.Topics()
+       topic.Range(func(key, value interface{}) bool { //range all topics
+               t, err := pubsub.ParseTopic(key.(string))
+               if err != nil {
+                       openlog.Error("can not parse topic " + key.(string) + 
": " + err.Error())
+                       return true
+               }
+               if t.Match(ke) {
+                       prepareCache(key.(string), t)
+                       notifyAndRemoveObservers(value, ke)
+               }
+               return true
+       })
+}
+
+func prepareCache(topicName string, topic *pubsub.Topic) {
+       rev, kvs, err := kvsvc.ListKV(context.TODO(), &model.ListKVRequest{
+               Domain:  topic.DomainID,
+               Project: topic.Project,
+               Labels:  topic.Labels,
+               Match:   topic.MatchType,
+       })
+       if err != nil {
+               openlog.Error("can not query kvs:" + err.Error())
+       }
+       cache.CachedKV().Write(topicName, &cache.DBResult{
+               KVs: kvs,
+               Rev: rev,
+               Err: err,
+       })
+}
+
+func notifyAndRemoveObservers(value interface{}, ke *pubsub.KVChangeEvent) {
+       observers := value.(*sync.Map)
+       observers.Range(func(id, value interface{}) bool {
+               observer := value.(*pubsub.Observer)
+               observer.Event <- ke
+               observers.Delete(id)
+               return true
+       })
+}
+func init() {
+       h := &KVHandler{
+               BatchInterval: pubsub.DefaultEventBatchInterval,
+               BatchSize:     pubsub.DefaultEventBatchSize,
+               Immediate:     true,
+       }
+       pubsub.RegisterHandler(pubsub.EventKVChange, h)
+       go h.RunFlushTask()
+}
diff --git a/server/pubsub/struct.go b/server/pubsub/struct.go
index 1a2d5d9..34e4a0d 100644
--- a/server/pubsub/struct.go
+++ b/server/pubsub/struct.go
@@ -62,8 +62,8 @@ type Topic struct {
        MatchType    string            `json:"match,omitempty"`
 }
 
-//ParseTopicString parse topic string to topic struct
-func ParseTopicString(s string) (*Topic, error) {
+//ParseTopic parse topic string to topic struct
+func ParseTopic(s string) (*Topic, error) {
        t := &Topic{
                Labels: make(map[string]string),
        }
diff --git a/server/pubsub/struct_test.go b/server/pubsub/struct_test.go
index 8069284..2ad99f2 100644
--- a/server/pubsub/struct_test.go
+++ b/server/pubsub/struct_test.go
@@ -49,6 +49,6 @@ func TestTopic_String(t *testing.T) {
        t.Log(string(b))
 
        mock := []byte(`{"labels":"a=b::c=d","domainID":"2","project":"1"}`)
-       topic, _ = pubsub.ParseTopicString(string(mock))
+       topic, _ = pubsub.ParseTopic(string(mock))
        t.Log(topic)
 }
diff --git a/server/resource/v1/common.go b/server/resource/v1/common.go
index 0feab3f..c89085b 100644
--- a/server/resource/v1/common.go
+++ b/server/resource/v1/common.go
@@ -21,13 +21,15 @@ import (
        "context"
        "encoding/json"
        "errors"
-       "github.com/satori/go.uuid"
        "net/http"
        "strconv"
        "strings"
        "sync"
        "time"
 
+       "github.com/apache/servicecomb-kie/server/cache"
+       "github.com/gofrs/uuid"
+
        "github.com/apache/servicecomb-kie/server/datasource"
        kvsvc "github.com/apache/servicecomb-kie/server/service/kv"
        "github.com/go-chassis/cari/rbac"
@@ -55,8 +57,12 @@ const (
 
 var observers = sync.Pool{
        New: func() interface{} {
+               id, err := uuid.NewV4()
+               if err != nil {
+                       openlog.Error("can not gen uuid")
+               }
                return &pubsub.Observer{
-                       UUID:  uuid.NewV4().String(),
+                       UUID:  id.String(),
                        Event: make(chan *pubsub.KVChangeEvent, 1),
                }
        },
@@ -187,24 +193,24 @@ func getMatchPattern(rctx *restful.Context) string {
        }
        return m
 }
-func eventHappened(waitStr string, topic *pubsub.Topic) (bool, error) {
+func eventHappened(waitStr string, topic *pubsub.Topic) (bool, string, error) {
        d, err := time.ParseDuration(waitStr)
        if err != nil || d > common.MaxWait {
-               return false, errors.New(common.MsgInvalidWait)
+               return false, "", errors.New(common.MsgInvalidWait)
        }
        happened := true
        o := observers.Get().(*pubsub.Observer)
        defer observers.Put(o)
-       err = pubsub.AddObserver(o, topic)
+       topicName, err := pubsub.AddObserver(o, topic)
        if err != nil {
-               return false, errors.New("observe once failed: " + err.Error())
+               return false, "", errors.New("observe once failed: " + 
err.Error())
        }
        select {
        case <-time.After(d):
                happened = false
        case <-o.Event:
        }
-       return happened, nil
+       return happened, topicName, nil
 }
 
 // size from 1 to start
@@ -254,7 +260,19 @@ func checkDomainAndProject(domain, project string) error {
        }
        return nil
 }
-
+func queryFromCache(rctx *restful.Context, topic string) {
+       rev, kv, queryErr := cache.CachedKV().Read(topic)
+       if queryErr != nil {
+               WriteErrResponse(rctx, queryErr.Code, queryErr.Message)
+               return
+       }
+       rctx.ReadResponseWriter().Header().Set(common.HeaderRevision, 
strconv.FormatInt(rev, 10))
+       err := writeResponse(rctx, kv)
+       rctx.ReadRestfulRequest().SetAttribute(common.RespBodyContextKey, 
kv.Data)
+       if err != nil {
+               openlog.Error(err.Error())
+       }
+}
 func queryAndResponse(rctx *restful.Context, request *model.ListKVRequest) {
        rev, kv, queryErr := kvsvc.ListKV(rctx.Ctx, request)
        if queryErr != nil {
diff --git a/server/resource/v1/history_resource_test.go 
b/server/resource/v1/history_resource_test.go
index 69c691c..29cd07d 100644
--- a/server/resource/v1/history_resource_test.go
+++ b/server/resource/v1/history_resource_test.go
@@ -20,12 +20,13 @@ import (
        "context"
        "encoding/json"
        "fmt"
-       kvsvc "github.com/apache/servicecomb-kie/server/service/kv"
        "io/ioutil"
        "net/http"
        "net/http/httptest"
        "testing"
 
+       kvsvc "github.com/apache/servicecomb-kie/server/service/kv"
+
        common2 "github.com/apache/servicecomb-kie/pkg/common"
 
        "github.com/apache/servicecomb-kie/pkg/model"
diff --git a/server/resource/v1/kv_resource.go 
b/server/resource/v1/kv_resource.go
index 0ac3242..dc709ff 100644
--- a/server/resource/v1/kv_resource.go
+++ b/server/resource/v1/kv_resource.go
@@ -244,7 +244,7 @@ func isLegalWaitRequest(rctx *restful.Context, request 
*model.ListKVRequest) boo
        return true
 }
 func watch(rctx *restful.Context, request *model.ListKVRequest, wait string) 
bool {
-       changed, err := eventHappened(wait, &pubsub.Topic{
+       changed, topic, err := eventHappened(wait, &pubsub.Topic{
                Labels:    request.Labels,
                Project:   request.Project,
                MatchType: request.Match,
@@ -255,7 +255,7 @@ func watch(rctx *restful.Context, request 
*model.ListKVRequest, wait string) boo
                return true
        }
        if changed {
-               queryAndResponse(rctx, request)
+               queryFromCache(rctx, topic)
                return true
        }
        return false
diff --git a/server/service/kv/kv_svc.go b/server/service/kv/kv_svc.go
index f92c9b0..3ed06ec 100644
--- a/server/service/kv/kv_svc.go
+++ b/server/service/kv/kv_svc.go
@@ -33,14 +33,12 @@ import (
        "github.com/go-chassis/foundation/validator"
        "github.com/go-chassis/go-chassis/v2/pkg/backends/quota"
        "github.com/go-chassis/openlog"
-       "github.com/satori/go.uuid"
+       "github.com/gofrs/uuid"
 )
 
-var sema = concurrency.NewSemaphore(concurrency.DefaultConcurrency)
+var listSema = concurrency.NewSemaphore(concurrency.DefaultConcurrency)
 
 func ListKV(ctx context.Context, request *model.ListKVRequest) (int64, 
*model.KVResponse, *errsvc.Error) {
-       sema.Acquire()
-       defer sema.Release()
        opts := []datasource.FindOption{
                datasource.WithKey(request.Key),
                datasource.WithLabels(request.Labels),
@@ -58,7 +56,7 @@ func ListKV(ctx context.Context, request 
*model.ListKVRequest) (int64, *model.KV
        if err != nil {
                return rev, nil, config.NewError(config.ErrInternal, 
err.Error())
        }
-       kv, err := datasource.GetBroker().GetKVDao().List(ctx, request.Project, 
request.Domain, opts...)
+       kv, err := List(ctx, request.Project, request.Domain, opts...)
        if err != nil {
                openlog.Error("common: " + err.Error())
                return rev, nil, config.NewError(config.ErrInternal, 
common.MsgDBError)
@@ -108,7 +106,11 @@ func Create(ctx context.Context, kv *model.KVDoc) 
(*model.KVDoc, *errsvc.Error)
                openlog.Error(err.Error())
                return nil, config.NewError(config.ErrInternal, "create kv 
failed")
        }
-       completeKV(kv, revision)
+       err = completeKV(kv, revision)
+       if err != nil {
+               openlog.Error(err.Error())
+               return nil, config.NewError(config.ErrInternal, "create kv 
failed")
+       }
        kv, err = datasource.GetBroker().GetKVDao().Create(ctx, kv)
        if err != nil {
                openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
@@ -125,13 +127,18 @@ func Create(ctx context.Context, kv *model.KVDoc) 
(*model.KVDoc, *errsvc.Error)
        return kv, nil
 }
 
-func completeKV(kv *model.KVDoc, revision int64) {
-       kv.ID = uuid.NewV4().String()
+func completeKV(kv *model.KVDoc, revision int64) error {
+       id, err := uuid.NewV4()
+       if err != nil {
+               return err
+       }
+       kv.ID = id.String()
        kv.UpdateRevision = revision
        kv.CreateRevision = revision
        now := time.Now().Unix()
        kv.CreateTime = now
        kv.UpdateTime = now
+       return nil
 }
 
 func Upload(ctx context.Context, request *model.UploadKVRequest) 
*model.DocRespOfUpload {
@@ -282,5 +289,7 @@ func Get(ctx context.Context, req *model.GetKVRequest) 
(*model.KVDoc, error) {
        return datasource.GetBroker().GetKVDao().Get(ctx, req)
 }
 func List(ctx context.Context, project, domain string, options 
...datasource.FindOption) (*model.KVResponse, error) {
+       listSema.Acquire()
+       defer listSema.Release()
        return datasource.GetBroker().GetKVDao().List(ctx, project, domain, 
options...)
 }
diff --git a/test/benchmark/watch.go b/test/benchmark/watch.go
index 1c14ea6..f702adf 100644
--- a/test/benchmark/watch.go
+++ b/test/benchmark/watch.go
@@ -14,7 +14,7 @@ import (
 var success int32
 var fail int32
 var total int32
-var clientNum = 10000
+var clientNum = 5000
 var wg = sync.WaitGroup{}
 var client = &http.Client{}
 var start time.Time
@@ -77,6 +77,5 @@ func watch(req *http.Request) error {
                log.Println(res.Status)
                return errors.New("not OK")
        }
-
        return nil
 }
diff --git a/test/init.go b/test/init.go
index 696486e..c037994 100644
--- a/test/init.go
+++ b/test/init.go
@@ -26,6 +26,7 @@ import (
 
        _ "github.com/apache/servicecomb-kie/server/datasource/etcd"
        _ "github.com/apache/servicecomb-kie/server/datasource/mongo"
+       _ "github.com/apache/servicecomb-kie/server/pubsub/notifier"
        _ "github.com/go-chassis/go-chassis/v2/security/cipher/plugins/plain"
        _ "github.com/little-cui/etcdadpt/embedded"
        _ "github.com/little-cui/etcdadpt/remote"

Reply via email to