This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git
The following commit(s) were added to refs/heads/master by this push:
new 9f0d306 optimize performance with cache (#212)
9f0d306 is described below
commit 9f0d306ae9e04b1ec5570de6db41ee6f7d768d73
Author: Shawn <[email protected]>
AuthorDate: Fri Aug 27 09:59:14 2021 +0800
optimize performance with cache (#212)
---
.github/workflows/golangci-lint.yml | 2 +-
cmd/kieserver/main.go | 3 +
go.mod | 2 +-
go.sum | 2 +
pkg/model/kv.go | 5 +-
server/cache/long_polling.go | 43 ++++++++
.../datasource/mongo/track/polling_detail_dao.go | 10 +-
server/pubsub/bus.go | 32 +++---
server/pubsub/bus_test.go | 10 +-
server/pubsub/event_handler.go | 94 ++++------------
server/pubsub/notifier/kv.go | 119 +++++++++++++++++++++
server/pubsub/struct.go | 4 +-
server/pubsub/struct_test.go | 2 +-
server/resource/v1/common.go | 34 ++++--
server/resource/v1/history_resource_test.go | 3 +-
server/resource/v1/kv_resource.go | 4 +-
server/service/kv/kv_svc.go | 25 +++--
test/benchmark/watch.go | 3 +-
test/init.go | 1 +
19 files changed, 269 insertions(+), 129 deletions(-)
diff --git a/.github/workflows/golangci-lint.yml
b/.github/workflows/golangci-lint.yml
index 63aef9d..51166d6 100644
--- a/.github/workflows/golangci-lint.yml
+++ b/.github/workflows/golangci-lint.yml
@@ -16,7 +16,7 @@ jobs:
uses: golangci/golangci-lint-action@v2
with:
version: v1.29
- args: --skip-dirs=examples,test --skip-files=.*_test.go$
+ args: --enable gofmt,gocyclo,goimports,dupl,gosec
--skip-dirs=examples,test --skip-files=.*_test.go$
static-checks:
runs-on: ubuntu-latest
env:
diff --git a/cmd/kieserver/main.go b/cmd/kieserver/main.go
index 2fe226e..aa3228e 100644
--- a/cmd/kieserver/main.go
+++ b/cmd/kieserver/main.go
@@ -38,6 +38,9 @@ import (
_ "github.com/apache/servicecomb-kie/server/plugin/qms"
//noop cipher
_ "github.com/go-chassis/go-chassis/v2/security/cipher/plugins/plain"
+
+ // event notifier
+ _ "github.com/apache/servicecomb-kie/server/pubsub/notifier"
)
func main() {
diff --git a/go.mod b/go.mod
index c8e0a93..c1d1160 100644
--- a/go.mod
+++ b/go.mod
@@ -8,9 +8,9 @@ require (
github.com/go-chassis/go-chassis/v2 v2.2.1-0.20210810140748-7274d2228000
github.com/go-chassis/openlog v1.1.3
github.com/go-chassis/seclog v1.3.0
+ github.com/gofrs/uuid v4.0.0+incompatible
github.com/hashicorp/serf v0.9.5
github.com/little-cui/etcdadpt v0.1.2
- github.com/satori/go.uuid v1.2.0
github.com/stretchr/testify v1.7.0
github.com/urfave/cli v1.22.4
go.mongodb.org/mongo-driver v1.4.6
diff --git a/go.sum b/go.sum
index 3e7aeef..e044869 100644
--- a/go.sum
+++ b/go.sum
@@ -258,6 +258,8 @@ github.com/gobuffalo/packr/v2 v2.2.0
h1:Ir9W9XIm9j7bhhkKE9cokvtTl1vBm62A/fene/ZC
github.com/gobuffalo/packr/v2 v2.2.0/go.mod
h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/VCm/3ptBN+0=
github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754
h1:tpom+2CJmpzAWj5/VEHync2rJGi+epHNIeRSWjzGA+4=
github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod
h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw=
+github.com/gofrs/uuid v4.0.0+incompatible
h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw=
+github.com/gofrs/uuid v4.0.0+incompatible/go.mod
h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/googleapis v1.1.0
h1:kFkMAZBNAn4j7K0GiZr8cRYzejq68VbheufiV3YuyFI=
github.com/gogo/googleapis v1.1.0/go.mod
h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/protobuf v1.1.1/go.mod
h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
diff --git a/pkg/model/kv.go b/pkg/model/kv.go
index ce3b422..f29186b 100644
--- a/pkg/model/kv.go
+++ b/pkg/model/kv.go
@@ -28,9 +28,8 @@ type KVRequest struct {
//KVResponse represents the key value list
type KVResponse struct {
- LabelDoc *LabelDocResponse `json:"label,omitempty"`
- Total int `json:"total"`
- Data []*KVDoc `json:"data"`
+ Total int `json:"total"`
+ Data []*KVDoc `json:"data"`
}
//LabelDocResponse is label struct
diff --git a/server/cache/long_polling.go b/server/cache/long_polling.go
new file mode 100644
index 0000000..b1162ce
--- /dev/null
+++ b/server/cache/long_polling.go
@@ -0,0 +1,43 @@
+package cache
+
+import (
+ "sync"
+
+ "github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/go-chassis/cari/pkg/errsvc"
+)
+
+var pollingCache = &LongPollingCache{}
+
+//LongPollingCache exchange space for time
+type LongPollingCache struct {
+ m sync.Map
+}
+type DBResult struct {
+ KVs *model.KVResponse
+ Err *errsvc.Error
+ Rev int64
+}
+
+func CachedKV() *LongPollingCache {
+ return pollingCache
+}
+
+//Read reads the cached query result
+//only need to filter by labels if match pattern is exact
+func (c *LongPollingCache) Read(topic string) (int64, *model.KVResponse,
*errsvc.Error) {
+ value, ok := c.m.Load(topic)
+ if !ok {
+ return 0, nil, nil
+ }
+ t := value.(*DBResult)
+ if t.Err != nil {
+ return 0, nil, t.Err
+ }
+ return t.Rev, t.KVs, nil
+
+}
+
+func (c *LongPollingCache) Write(topic string, r *DBResult) {
+ c.m.Store(topic, r)
+}
diff --git a/server/datasource/mongo/track/polling_detail_dao.go
b/server/datasource/mongo/track/polling_detail_dao.go
index 5a10590..5c51e67 100644
--- a/server/datasource/mongo/track/polling_detail_dao.go
+++ b/server/datasource/mongo/track/polling_detail_dao.go
@@ -24,7 +24,7 @@ import (
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/datasource/mongo/session"
"github.com/go-chassis/openlog"
- "github.com/satori/go.uuid"
+ "github.com/gofrs/uuid"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
@@ -41,8 +41,12 @@ func (s *Dao) CreateOrUpdate(ctx context.Context, detail
*model.PollingDetail) (
res := collection.FindOne(ctx, queryFilter)
if res.Err() != nil {
if res.Err() == mongo.ErrNoDocuments {
- detail.ID = uuid.NewV4().String()
- _, err := collection.InsertOne(ctx, detail)
+ id, err := uuid.NewV4()
+ if err != nil {
+ return nil, err
+ }
+ detail.ID = id.String()
+ _, err = collection.InsertOne(ctx, detail)
if err != nil {
return nil, err
}
diff --git a/server/pubsub/bus.go b/server/pubsub/bus.go
index b8b2b18..063a92a 100644
--- a/server/pubsub/bus.go
+++ b/server/pubsub/bus.go
@@ -22,6 +22,7 @@ import (
"time"
"encoding/json"
+
"github.com/apache/servicecomb-kie/pkg/stringutil"
"github.com/apache/servicecomb-kie/server/config"
"github.com/go-chassis/openlog"
@@ -39,9 +40,12 @@ const (
DefaultEventBatchInterval = 500 * time.Millisecond
)
-var mutexObservers sync.RWMutex
var topics sync.Map
+func Topics() *sync.Map {
+ return &topics
+}
+
//Bus is message bug
type Bus struct {
agent *agent.Agent
@@ -87,12 +91,7 @@ func Start() {
openlog.Fatal("can not sync key value change events to other
kie nodes" + err.Error())
}
openlog.Info("kie message bus started")
- eh := &EventHandler{
- BatchInterval: DefaultEventBatchInterval,
- BatchSize: DefaultEventBatchSize,
- Immediate: true,
- }
- go eh.RunFlushTask()
+ eh := &ClusterEventHandler{}
bus.agent.RegisterEventHandler(eh)
}
func join(addresses []string) error {
@@ -114,24 +113,23 @@ func Publish(event *KVChangeEvent) error {
}
//AddObserver observe key changes by (key or labels) or (key and labels)
-func AddObserver(o *Observer, topic *Topic) error {
+func AddObserver(o *Observer, topic *Topic) (string, error) {
topic.LabelsFormat = stringutil.FormatMap(topic.Labels)
b, err := json.Marshal(topic)
if err != nil {
- return err
+ return "", err
}
t := string(b)
observers, ok := topics.Load(t)
if !ok {
- topics.Store(t, map[string]*Observer{
- o.UUID: o,
- })
+ var observers = &sync.Map{}
+ observers.Store(o.UUID, o)
+ topics.Store(t, observers)
openlog.Info("new topic:" + t)
- return nil
+ return t, nil
}
- mutexObservers.Lock()
- observers.(map[string]*Observer)[o.UUID] = o
- mutexObservers.Unlock()
+ m := observers.(*sync.Map)
+ m.Store(o.UUID, o)
openlog.Debug("add new observer for topic:" + t)
- return nil
+ return t, nil
}
diff --git a/server/pubsub/bus_test.go b/server/pubsub/bus_test.go
index 09cfd1b..5df3b34 100644
--- a/server/pubsub/bus_test.go
+++ b/server/pubsub/bus_test.go
@@ -20,21 +20,23 @@ package pubsub_test
import (
"testing"
+ _ "github.com/apache/servicecomb-kie/test"
+ "github.com/gofrs/uuid"
+
"github.com/apache/servicecomb-kie/server/config"
"github.com/apache/servicecomb-kie/server/pubsub"
- "github.com/satori/go.uuid"
)
func TestInit(t *testing.T) {
config.Configurations = &config.Config{}
pubsub.Init()
pubsub.Start()
-
+ id, _ := uuid.NewV4()
o := &pubsub.Observer{
- UUID: uuid.NewV4().String(),
+ UUID: id.String(),
Event: make(chan *pubsub.KVChangeEvent, 1),
}
- _ = pubsub.AddObserver(o, &pubsub.Topic{
+ _, _ = pubsub.AddObserver(o, &pubsub.Topic{
Project: "1",
DomainID: "2",
Labels: map[string]string{
diff --git a/server/pubsub/event_handler.go b/server/pubsub/event_handler.go
index 1858b62..e6ecdb7 100644
--- a/server/pubsub/event_handler.go
+++ b/server/pubsub/event_handler.go
@@ -19,97 +19,39 @@ package pubsub
import (
"strings"
- "sync"
- "time"
"github.com/go-chassis/openlog"
+ "github.com/hashicorp/serf/cmd/serf/command/agent"
"github.com/hashicorp/serf/serf"
)
-//EventHandler handler serf custom event, it is singleton
-type EventHandler struct {
- BatchSize int
- BatchInterval time.Duration
- Immediate bool
- pendingEvents sync.Map
- pendingEventsCount int
+var handlers = make(map[string]agent.EventHandler)
+
+func RegisterHandler(typ string, h agent.EventHandler) {
+ handlers[typ] = h
+ openlog.Info("register handler for:" + typ)
+}
+
+//ClusterEventHandler handler serf custom event, it is singleton
+type ClusterEventHandler struct {
}
//HandleEvent send event to subscribers
-func (h *EventHandler) HandleEvent(e serf.Event) {
+func (h *ClusterEventHandler) HandleEvent(e serf.Event) {
openlog.Debug("receive event:" + e.EventType().String())
switch e.EventType().String() {
case "user":
- if strings.Contains(e.String(), EventKVChange) {
- h.handleKVEvent(e)
- }
- }
-
-}
-func (h *EventHandler) RunFlushTask() {
- for {
- if h.pendingEventsCount >= h.BatchSize {
- h.fireEvents()
- }
- <-time.After(h.BatchInterval)
- h.fireEvents()
+ h.DispatchEvent(e)
}
}
-func (h *EventHandler) handleKVEvent(e serf.Event) {
- ue := e.(serf.UserEvent)
- ke, err := NewKVChangeEvent(ue.Payload)
- if err != nil {
- openlog.Error("invalid json:" + string(ue.Payload))
- }
- openlog.Debug("kv event:" + ke.Key)
- if h.Immediate { //never retain event, not recommended
- h.FindTopicAndFire(ke)
- } else {
- h.mergeAndSave(ke)
- }
-}
-func (h *EventHandler) mergeAndSave(ke *KVChangeEvent) {
- id := ke.String()
- _, ok := h.pendingEvents.Load(id)
- if ok {
- openlog.Debug("ignore same event: " + id)
+func (h *ClusterEventHandler) DispatchEvent(e serf.Event) {
+ typ := strings.Replace(e.String(), "user-event: ", "", 1)
+ eh, ok := handlers[typ]
+ if !ok {
+ openlog.Warn("can not handle:" + typ)
return
}
- h.pendingEvents.Store(id, ke)
- h.pendingEventsCount++
-}
-func (h *EventHandler) fireEvents() {
- h.pendingEvents.Range(func(key, value interface{}) bool {
- ke := value.(*KVChangeEvent)
- h.FindTopicAndFire(ke)
- h.pendingEvents.Delete(key)
- h.pendingEventsCount--
- return true
- })
-}
-
-func (h *EventHandler) FindTopicAndFire(ke *KVChangeEvent) {
- topics.Range(func(key, value interface{}) bool { //range all topics
- t, err := ParseTopicString(key.(string))
- if err != nil {
- openlog.Error("can not parse topic " + key.(string) +
": " + err.Error())
- return true
- }
- if t.Match(ke) {
- fireEvent(value, ke)
- }
- return true
- })
-}
-
-func fireEvent(value interface{}, ke *KVChangeEvent) {
- observers := value.(map[string]*Observer)
- mutexObservers.Lock()
- defer mutexObservers.Unlock()
- for k, v := range observers {
- v.Event <- ke
- delete(observers, k)
- }
+ eh.HandleEvent(e)
}
diff --git a/server/pubsub/notifier/kv.go b/server/pubsub/notifier/kv.go
new file mode 100644
index 0000000..634f1c3
--- /dev/null
+++ b/server/pubsub/notifier/kv.go
@@ -0,0 +1,119 @@
+package notifier
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/apache/servicecomb-kie/server/cache"
+ "github.com/apache/servicecomb-kie/server/pubsub"
+ kvsvc "github.com/apache/servicecomb-kie/server/service/kv"
+ "github.com/go-chassis/openlog"
+ "github.com/hashicorp/serf/serf"
+)
+
+//KVHandler handler serf custom event, it is singleton
+type KVHandler struct {
+ BatchSize int
+ BatchInterval time.Duration
+ Immediate bool
+ pendingEvents sync.Map
+ pendingEventsCount int
+}
+
+func (h *KVHandler) RunFlushTask() {
+ for {
+ if h.pendingEventsCount >= h.BatchSize {
+ h.fireEvents()
+ }
+ <-time.After(h.BatchInterval)
+ h.fireEvents()
+ }
+
+}
+func (h *KVHandler) HandleEvent(e serf.Event) {
+ ue := e.(serf.UserEvent)
+ ke, err := pubsub.NewKVChangeEvent(ue.Payload)
+ if err != nil {
+ openlog.Error("invalid json:" + string(ue.Payload))
+ }
+ openlog.Debug("kv event:" + ke.Key)
+ if h.Immediate { //never retain event, not recommended
+ h.FindTopicAndFire(ke)
+ } else {
+ h.mergeAndSave(ke)
+ }
+
+}
+func (h *KVHandler) mergeAndSave(ke *pubsub.KVChangeEvent) {
+ id := ke.String()
+ _, ok := h.pendingEvents.Load(id)
+ if ok {
+ openlog.Debug("ignore same event: " + id)
+ return
+ }
+ h.pendingEvents.Store(id, ke)
+ h.pendingEventsCount++
+}
+func (h *KVHandler) fireEvents() {
+ h.pendingEvents.Range(func(key, value interface{}) bool {
+ ke := value.(*pubsub.KVChangeEvent)
+ h.FindTopicAndFire(ke)
+ h.pendingEvents.Delete(key)
+ h.pendingEventsCount--
+ return true
+ })
+}
+
+func (h *KVHandler) FindTopicAndFire(ke *pubsub.KVChangeEvent) {
+ topic := pubsub.Topics()
+ topic.Range(func(key, value interface{}) bool { //range all topics
+ t, err := pubsub.ParseTopic(key.(string))
+ if err != nil {
+ openlog.Error("can not parse topic " + key.(string) +
": " + err.Error())
+ return true
+ }
+ if t.Match(ke) {
+ prepareCache(key.(string), t)
+ notifyAndRemoveObservers(value, ke)
+ }
+ return true
+ })
+}
+
+func prepareCache(topicName string, topic *pubsub.Topic) {
+ rev, kvs, err := kvsvc.ListKV(context.TODO(), &model.ListKVRequest{
+ Domain: topic.DomainID,
+ Project: topic.Project,
+ Labels: topic.Labels,
+ Match: topic.MatchType,
+ })
+ if err != nil {
+ openlog.Error("can not query kvs:" + err.Error())
+ }
+ cache.CachedKV().Write(topicName, &cache.DBResult{
+ KVs: kvs,
+ Rev: rev,
+ Err: err,
+ })
+}
+
+func notifyAndRemoveObservers(value interface{}, ke *pubsub.KVChangeEvent) {
+ observers := value.(*sync.Map)
+ observers.Range(func(id, value interface{}) bool {
+ observer := value.(*pubsub.Observer)
+ observer.Event <- ke
+ observers.Delete(id)
+ return true
+ })
+}
+func init() {
+ h := &KVHandler{
+ BatchInterval: pubsub.DefaultEventBatchInterval,
+ BatchSize: pubsub.DefaultEventBatchSize,
+ Immediate: true,
+ }
+ pubsub.RegisterHandler(pubsub.EventKVChange, h)
+ go h.RunFlushTask()
+}
diff --git a/server/pubsub/struct.go b/server/pubsub/struct.go
index 1a2d5d9..34e4a0d 100644
--- a/server/pubsub/struct.go
+++ b/server/pubsub/struct.go
@@ -62,8 +62,8 @@ type Topic struct {
MatchType string `json:"match,omitempty"`
}
-//ParseTopicString parse topic string to topic struct
-func ParseTopicString(s string) (*Topic, error) {
+//ParseTopic parse topic string to topic struct
+func ParseTopic(s string) (*Topic, error) {
t := &Topic{
Labels: make(map[string]string),
}
diff --git a/server/pubsub/struct_test.go b/server/pubsub/struct_test.go
index 8069284..2ad99f2 100644
--- a/server/pubsub/struct_test.go
+++ b/server/pubsub/struct_test.go
@@ -49,6 +49,6 @@ func TestTopic_String(t *testing.T) {
t.Log(string(b))
mock := []byte(`{"labels":"a=b::c=d","domainID":"2","project":"1"}`)
- topic, _ = pubsub.ParseTopicString(string(mock))
+ topic, _ = pubsub.ParseTopic(string(mock))
t.Log(topic)
}
diff --git a/server/resource/v1/common.go b/server/resource/v1/common.go
index 0feab3f..c89085b 100644
--- a/server/resource/v1/common.go
+++ b/server/resource/v1/common.go
@@ -21,13 +21,15 @@ import (
"context"
"encoding/json"
"errors"
- "github.com/satori/go.uuid"
"net/http"
"strconv"
"strings"
"sync"
"time"
+ "github.com/apache/servicecomb-kie/server/cache"
+ "github.com/gofrs/uuid"
+
"github.com/apache/servicecomb-kie/server/datasource"
kvsvc "github.com/apache/servicecomb-kie/server/service/kv"
"github.com/go-chassis/cari/rbac"
@@ -55,8 +57,12 @@ const (
var observers = sync.Pool{
New: func() interface{} {
+ id, err := uuid.NewV4()
+ if err != nil {
+ openlog.Error("can not gen uuid")
+ }
return &pubsub.Observer{
- UUID: uuid.NewV4().String(),
+ UUID: id.String(),
Event: make(chan *pubsub.KVChangeEvent, 1),
}
},
@@ -187,24 +193,24 @@ func getMatchPattern(rctx *restful.Context) string {
}
return m
}
-func eventHappened(waitStr string, topic *pubsub.Topic) (bool, error) {
+func eventHappened(waitStr string, topic *pubsub.Topic) (bool, string, error) {
d, err := time.ParseDuration(waitStr)
if err != nil || d > common.MaxWait {
- return false, errors.New(common.MsgInvalidWait)
+ return false, "", errors.New(common.MsgInvalidWait)
}
happened := true
o := observers.Get().(*pubsub.Observer)
defer observers.Put(o)
- err = pubsub.AddObserver(o, topic)
+ topicName, err := pubsub.AddObserver(o, topic)
if err != nil {
- return false, errors.New("observe once failed: " + err.Error())
+ return false, "", errors.New("observe once failed: " +
err.Error())
}
select {
case <-time.After(d):
happened = false
case <-o.Event:
}
- return happened, nil
+ return happened, topicName, nil
}
// size from 1 to start
@@ -254,7 +260,19 @@ func checkDomainAndProject(domain, project string) error {
}
return nil
}
-
+func queryFromCache(rctx *restful.Context, topic string) {
+ rev, kv, queryErr := cache.CachedKV().Read(topic)
+ if queryErr != nil {
+ WriteErrResponse(rctx, queryErr.Code, queryErr.Message)
+ return
+ }
+ rctx.ReadResponseWriter().Header().Set(common.HeaderRevision,
strconv.FormatInt(rev, 10))
+ err := writeResponse(rctx, kv)
+ rctx.ReadRestfulRequest().SetAttribute(common.RespBodyContextKey,
kv.Data)
+ if err != nil {
+ openlog.Error(err.Error())
+ }
+}
func queryAndResponse(rctx *restful.Context, request *model.ListKVRequest) {
rev, kv, queryErr := kvsvc.ListKV(rctx.Ctx, request)
if queryErr != nil {
diff --git a/server/resource/v1/history_resource_test.go
b/server/resource/v1/history_resource_test.go
index 69c691c..29cd07d 100644
--- a/server/resource/v1/history_resource_test.go
+++ b/server/resource/v1/history_resource_test.go
@@ -20,12 +20,13 @@ import (
"context"
"encoding/json"
"fmt"
- kvsvc "github.com/apache/servicecomb-kie/server/service/kv"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
+ kvsvc "github.com/apache/servicecomb-kie/server/service/kv"
+
common2 "github.com/apache/servicecomb-kie/pkg/common"
"github.com/apache/servicecomb-kie/pkg/model"
diff --git a/server/resource/v1/kv_resource.go
b/server/resource/v1/kv_resource.go
index 0ac3242..dc709ff 100644
--- a/server/resource/v1/kv_resource.go
+++ b/server/resource/v1/kv_resource.go
@@ -244,7 +244,7 @@ func isLegalWaitRequest(rctx *restful.Context, request
*model.ListKVRequest) boo
return true
}
func watch(rctx *restful.Context, request *model.ListKVRequest, wait string)
bool {
- changed, err := eventHappened(wait, &pubsub.Topic{
+ changed, topic, err := eventHappened(wait, &pubsub.Topic{
Labels: request.Labels,
Project: request.Project,
MatchType: request.Match,
@@ -255,7 +255,7 @@ func watch(rctx *restful.Context, request
*model.ListKVRequest, wait string) boo
return true
}
if changed {
- queryAndResponse(rctx, request)
+ queryFromCache(rctx, topic)
return true
}
return false
diff --git a/server/service/kv/kv_svc.go b/server/service/kv/kv_svc.go
index f92c9b0..3ed06ec 100644
--- a/server/service/kv/kv_svc.go
+++ b/server/service/kv/kv_svc.go
@@ -33,14 +33,12 @@ import (
"github.com/go-chassis/foundation/validator"
"github.com/go-chassis/go-chassis/v2/pkg/backends/quota"
"github.com/go-chassis/openlog"
- "github.com/satori/go.uuid"
+ "github.com/gofrs/uuid"
)
-var sema = concurrency.NewSemaphore(concurrency.DefaultConcurrency)
+var listSema = concurrency.NewSemaphore(concurrency.DefaultConcurrency)
func ListKV(ctx context.Context, request *model.ListKVRequest) (int64,
*model.KVResponse, *errsvc.Error) {
- sema.Acquire()
- defer sema.Release()
opts := []datasource.FindOption{
datasource.WithKey(request.Key),
datasource.WithLabels(request.Labels),
@@ -58,7 +56,7 @@ func ListKV(ctx context.Context, request
*model.ListKVRequest) (int64, *model.KV
if err != nil {
return rev, nil, config.NewError(config.ErrInternal,
err.Error())
}
- kv, err := datasource.GetBroker().GetKVDao().List(ctx, request.Project,
request.Domain, opts...)
+ kv, err := List(ctx, request.Project, request.Domain, opts...)
if err != nil {
openlog.Error("common: " + err.Error())
return rev, nil, config.NewError(config.ErrInternal,
common.MsgDBError)
@@ -108,7 +106,11 @@ func Create(ctx context.Context, kv *model.KVDoc)
(*model.KVDoc, *errsvc.Error)
openlog.Error(err.Error())
return nil, config.NewError(config.ErrInternal, "create kv
failed")
}
- completeKV(kv, revision)
+ err = completeKV(kv, revision)
+ if err != nil {
+ openlog.Error(err.Error())
+ return nil, config.NewError(config.ErrInternal, "create kv
failed")
+ }
kv, err = datasource.GetBroker().GetKVDao().Create(ctx, kv)
if err != nil {
openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
@@ -125,13 +127,18 @@ func Create(ctx context.Context, kv *model.KVDoc)
(*model.KVDoc, *errsvc.Error)
return kv, nil
}
-func completeKV(kv *model.KVDoc, revision int64) {
- kv.ID = uuid.NewV4().String()
+func completeKV(kv *model.KVDoc, revision int64) error {
+ id, err := uuid.NewV4()
+ if err != nil {
+ return err
+ }
+ kv.ID = id.String()
kv.UpdateRevision = revision
kv.CreateRevision = revision
now := time.Now().Unix()
kv.CreateTime = now
kv.UpdateTime = now
+ return nil
}
func Upload(ctx context.Context, request *model.UploadKVRequest)
*model.DocRespOfUpload {
@@ -282,5 +289,7 @@ func Get(ctx context.Context, req *model.GetKVRequest)
(*model.KVDoc, error) {
return datasource.GetBroker().GetKVDao().Get(ctx, req)
}
func List(ctx context.Context, project, domain string, options
...datasource.FindOption) (*model.KVResponse, error) {
+ listSema.Acquire()
+ defer listSema.Release()
return datasource.GetBroker().GetKVDao().List(ctx, project, domain,
options...)
}
diff --git a/test/benchmark/watch.go b/test/benchmark/watch.go
index 1c14ea6..f702adf 100644
--- a/test/benchmark/watch.go
+++ b/test/benchmark/watch.go
@@ -14,7 +14,7 @@ import (
var success int32
var fail int32
var total int32
-var clientNum = 10000
+var clientNum = 5000
var wg = sync.WaitGroup{}
var client = &http.Client{}
var start time.Time
@@ -77,6 +77,5 @@ func watch(req *http.Request) error {
log.Println(res.Status)
return errors.New("not OK")
}
-
return nil
}
diff --git a/test/init.go b/test/init.go
index 696486e..c037994 100644
--- a/test/init.go
+++ b/test/init.go
@@ -26,6 +26,7 @@ import (
_ "github.com/apache/servicecomb-kie/server/datasource/etcd"
_ "github.com/apache/servicecomb-kie/server/datasource/mongo"
+ _ "github.com/apache/servicecomb-kie/server/pubsub/notifier"
_ "github.com/go-chassis/go-chassis/v2/security/cipher/plugins/plain"
_ "github.com/little-cui/etcdadpt/embedded"
_ "github.com/little-cui/etcdadpt/remote"