This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git


The following commit(s) were added to refs/heads/master by this push:
     new 24b40d7  optimize long polling performance (#210)
24b40d7 is described below

commit 24b40d78f892292fc78b39e64fdd0accf19c4a7a
Author: Shawn <[email protected]>
AuthorDate: Thu Aug 26 09:06:01 2021 +0800

    optimize long polling performance (#210)
---
 .github/workflows/etcd_storage.yml                 | 24 +++++--
 .github/workflows/mongo_storage.yml                |  4 +-
 docs/api.yaml                                      |  4 +-
 docs/development-guide/dev.md                      | 40 +++++++++++
 examples/dev/kie-conf.yaml                         | 25 +++----
 go.mod                                             |  2 +-
 pkg/concurrency/semaphore.go                       | 35 +++++++++
 pkg/util/util_test.go                              | 14 ++++
 .../datasource/mongo/track/polling_detail_dao.go   |  2 +-
 server/pubsub/bus.go                               | 19 +++--
 server/pubsub/bus_test.go                          |  5 +-
 server/pubsub/event_handler.go                     | 52 ++++++++++++--
 server/pubsub/struct.go                            | 18 ++---
 server/pubsub/struct_test.go                       |  7 +-
 server/resource/v1/common.go                       | 24 ++++---
 server/resource/v1/kv_resource.go                  | 49 +++++++------
 server/resource/v1/kv_resource_test.go             | 21 ++++--
 server/service/kv/kv_svc.go                        | 18 +++--
 test/benchmark/list.sh                             |  1 +
 test/benchmark/watch.go                            | 82 ++++++++++++++++++++++
 test/init.go                                       |  4 +-
 21 files changed, 354 insertions(+), 96 deletions(-)

diff --git a/.github/workflows/etcd_storage.yml 
b/.github/workflows/etcd_storage.yml
index 9a53a8c..c15f90f 100644
--- a/.github/workflows/etcd_storage.yml
+++ b/.github/workflows/etcd_storage.yml
@@ -13,15 +13,25 @@ jobs:
       uses: actions/checkout@v1
     - name: UT for etcd
       run: |
-        docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name 
etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls 
http://0.0.0.0:2379
+        time docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd etcd 
-name etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls 
http://0.0.0.0:2379
         while ! nc -z 127.0.0.1 2379; do
           sleep 1
         done
         export TEST_DB_KIND=etcd
         export TEST_DB_URI=127.0.0.1:2379
-        go test $(go list ./... | grep -v mongo | grep -v third_party | grep 
-v examples) -cover -covermode atomic -coverprofile coverage.out
-    - name: UT for embedded etcd
-      run: |
-        export TEST_DB_KIND=embedded_etcd
-        export TEST_DB_URI=default=http://127.0.0.1:2380
-        go test $(go list ./... | grep -v mongo | grep -v third_party | grep 
-v examples) -cover -covermode atomic -coverprofile coverage.out
\ No newline at end of file
+        time go test $(go list ./... | grep -v mongo | grep -v third_party | 
grep -v examples)
+  embedded-etcd-storage:
+      runs-on: ubuntu-latest
+      steps:
+        - name: Set up Go
+          uses: actions/setup-go@v1
+          with:
+            go-version: 1.16
+          id: go
+        - name: Check out code into the Go module directory
+          uses: actions/checkout@v1
+        - name: UT for embedded etcd
+          run: |
+            export TEST_DB_KIND=embedded_etcd
+            export TEST_DB_URI=default=http://127.0.0.1:2380
+            go test $(go list ./... | grep -v mongo | grep -v third_party | 
grep -v examples)
\ No newline at end of file
diff --git a/.github/workflows/mongo_storage.yml 
b/.github/workflows/mongo_storage.yml
index 42bc194..1a34750 100644
--- a/.github/workflows/mongo_storage.yml
+++ b/.github/workflows/mongo_storage.yml
@@ -14,10 +14,10 @@ jobs:
     - name: UT
       run: |
         cd build
-        bash build_docker.sh
+        time bash build_docker.sh
         cd ../
         sudo docker-compose -f ./deployments/docker/docker-compose.yaml up -d
         sleep 20
         export TEST_DB_KIND=mongo
         export TEST_DB_URI=mongodb://kie:[email protected]:27017/kie
-        go test $(go list ./... | grep -v etcd |  grep -v third_party | grep 
-v examples) -cover -covermode atomic -coverprofile coverage.out
\ No newline at end of file
+        time go test -v $(go list ./... | grep -v etcd |  grep -v third_party 
| grep -v examples)
\ No newline at end of file
diff --git a/docs/api.yaml b/docs/api.yaml
index 8c00d5f..f9b3bab 100644
--- a/docs/api.yaml
+++ b/docs/api.yaml
@@ -1,7 +1,7 @@
 swagger: '2.0'
 info:
-  title: ''
-  version: ''
+  title: 'servicecomb-kie'
+  version: 'v1'
 basePath: /
 paths:
   /v1/health:
diff --git a/docs/development-guide/dev.md b/docs/development-guide/dev.md
new file mode 100644
index 0000000..f0e6dea
--- /dev/null
+++ b/docs/development-guide/dev.md
@@ -0,0 +1,40 @@
+# Preparing for Development
+
+### Setting Go
+follow the official website to install
+
+### Build
+```shell
+cd examples/dev
+go build github.com/apache/servicecomb-kie/cmd/kieserver
+```
+
+### Setting up Config File
+the most important thing is set the persistence storage kind, you can set it 
to "embedded_etcd", 
+so that you don't need to set up an etcd or mongodb 
+```shell
+vim examples/dev/kie-conf.yaml
+```
+```yaml
+db:
+  # kind can be mongo, etcd, embedded_etcd
+  kind: embedded_etcd
+  # uri is the db endpoints list
+  #   kind=mongo, then is the mongodb cluster's uri, e.g. 
mongodb://127.0.0.1:27017/kie
+  #   kind=etcd, then is the  remote etcd server's advertise-client-urls, e.g. 
http://127.0.0.1:2379
+  #   kind=embedded_etcd, then is the embedded etcd server's 
advertise-peer-urls, e.g. default=http://127.0.0.1:2380
+  #uri: mongodb://kie:[email protected]:27017/kie
+```
+### Setting up go chassis config file
+kie is developed base on go chassis and uses most of its features, go chassis 
need config file to launch itself
+
+```shell
+cd examples/dev/conf
+```
+move conf folder to same level with "kieserver" binary, if you move binary to  
"examples/dev", 
+you don't need to move or change those files
+### Run
+Now you can launch your config server
+```shell
+./kieserver --config kie-conf.yaml
+```
\ No newline at end of file
diff --git a/examples/dev/kie-conf.yaml b/examples/dev/kie-conf.yaml
index 0607d72..39a5b03 100644
--- a/examples/dev/kie-conf.yaml
+++ b/examples/dev/kie-conf.yaml
@@ -1,18 +1,19 @@
 db:
   # kind can be mongo, etcd, embedded_etcd
-  kind: mongo
+  kind: etcd
   # uri is the db endpoints list
   #   kind=mongo, then is the mongodb cluster's uri, e.g. 
mongodb://127.0.0.1:27017/kie
   #   kind=etcd, then is the  remote etcd server's advertise-client-urls, e.g. 
http://127.0.0.1:2379
   #   kind=embedded_etcd, then is the embedded etcd server's 
advertise-peer-urls, e.g. default=http://127.0.0.1:2380
-  uri: mongodb://kie:[email protected]:27017/kie
-  poolSize: 10
-  timeout: 5m
-  sslEnabled: false
-  rootCAFile: ./ssl/trust.cer
-  certFile: ./ssl/server.cer
-  keyFile: ./ssl/server_key.pem
-  certPwdFile: ./ssl/cert_pwd
-rbac:
-  enabled: false
-  rsaPublicKeyFile: ./examples/dev/public.key
\ No newline at end of file
+  #uri: mongodb://kie:[email protected]:27017/kie
+  uri: http://127.0.0.1:2380
+#  poolSize: 10
+#  timeout: 5m
+#  sslEnabled: false
+#  rootCAFile: ./ssl/trust.cer
+#  certFile: ./ssl/server.cer
+#  keyFile: ./ssl/server_key.pem
+#  certPwdFile: ./ssl/cert_pwd
+#rbac:
+#  enabled: false
+#  rsaPublicKeyFile: ./examples/dev/public.key
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 79d3649..c8e0a93 100644
--- a/go.mod
+++ b/go.mod
@@ -18,4 +18,4 @@ require (
        gopkg.in/yaml.v2 v2.3.0
 )
 
-go 1.13
+go 1.16
diff --git a/pkg/concurrency/semaphore.go b/pkg/concurrency/semaphore.go
new file mode 100644
index 0000000..1dbf671
--- /dev/null
+++ b/pkg/concurrency/semaphore.go
@@ -0,0 +1,35 @@
+package concurrency
+
+import "math"
+
+const (
+       DefaultConcurrency = 500
+       MaxConcurrency     = math.MaxUint16
+)
+
+//Semaphore ctl the max concurrency
+type Semaphore struct {
+       tickets chan bool
+}
+
+//NewSemaphore accept concurrency number, not more than 65535
+func NewSemaphore(concurrency int) *Semaphore {
+       if concurrency >= math.MaxUint16 {
+               concurrency = MaxConcurrency
+       }
+       b := &Semaphore{
+               tickets: make(chan bool, concurrency),
+       }
+       for i := 0; i < concurrency; i++ {
+               b.tickets <- true
+       }
+       return b
+}
+func (b *Semaphore) Acquire() {
+       <-b.tickets
+}
+
+//Release return back signal
+func (b *Semaphore) Release() {
+       b.tickets <- true
+}
diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go
index a98e0ed..44a0e0b 100644
--- a/pkg/util/util_test.go
+++ b/pkg/util/util_test.go
@@ -42,3 +42,17 @@ func TestIsEquivalentLabel(t *testing.T) {
        assert.Equal(t, util.IsEquivalentLabel(m3, m4), true)
        assert.Equal(t, util.IsEquivalentLabel(m3, m5), false)
 }
+func BenchmarkIsEquivalentLabel(b *testing.B) {
+       m1 := map[string]string{
+               "foo": "bar",
+               "a":   "b",
+       }
+       m2 := map[string]string{
+               "foo": "bar",
+               "c":   "d",
+               "s":   "d",
+       }
+       for i := 0; i < b.N; i++ {
+               util.IsEquivalentLabel(m1, m2)
+       }
+}
diff --git a/server/datasource/mongo/track/polling_detail_dao.go 
b/server/datasource/mongo/track/polling_detail_dao.go
index 9d71252..5a10590 100644
--- a/server/datasource/mongo/track/polling_detail_dao.go
+++ b/server/datasource/mongo/track/polling_detail_dao.go
@@ -24,7 +24,7 @@ import (
        "github.com/apache/servicecomb-kie/server/datasource"
        "github.com/apache/servicecomb-kie/server/datasource/mongo/session"
        "github.com/go-chassis/openlog"
-       uuid "github.com/satori/go.uuid"
+       "github.com/satori/go.uuid"
        "go.mongodb.org/mongo-driver/bson"
        "go.mongodb.org/mongo-driver/mongo"
 )
diff --git a/server/pubsub/bus.go b/server/pubsub/bus.go
index ab9925e..b8b2b18 100644
--- a/server/pubsub/bus.go
+++ b/server/pubsub/bus.go
@@ -18,9 +18,10 @@
 package pubsub
 
 import (
-       "encoding/json"
        "sync"
+       "time"
 
+       "encoding/json"
        "github.com/apache/servicecomb-kie/pkg/stringutil"
        "github.com/apache/servicecomb-kie/server/config"
        "github.com/go-chassis/openlog"
@@ -33,7 +34,9 @@ var bus *Bus
 
 //const
 const (
-       EventKVChange = "kv-changed"
+       EventKVChange             = "kv-chg"
+       DefaultEventBatchSize     = 5000
+       DefaultEventBatchInterval = 500 * time.Millisecond
 )
 
 var mutexObservers sync.RWMutex
@@ -84,7 +87,13 @@ func Start() {
                openlog.Fatal("can not sync key value change events to other 
kie nodes" + err.Error())
        }
        openlog.Info("kie message bus started")
-       bus.agent.RegisterEventHandler(&EventHandler{})
+       eh := &EventHandler{
+               BatchInterval: DefaultEventBatchInterval,
+               BatchSize:     DefaultEventBatchSize,
+               Immediate:     true,
+       }
+       go eh.RunFlushTask()
+       bus.agent.RegisterEventHandler(eh)
 }
 func join(addresses []string) error {
        _, err := bus.agent.Join(addresses, false)
@@ -104,8 +113,8 @@ func Publish(event *KVChangeEvent) error {
 
 }
 
-//ObserveOnce observe key changes by (key or labels) or (key and labels)
-func ObserveOnce(o *Observer, topic *Topic) error {
+//AddObserver observe key changes by (key or labels) or (key and labels)
+func AddObserver(o *Observer, topic *Topic) error {
        topic.LabelsFormat = stringutil.FormatMap(topic.Labels)
        b, err := json.Marshal(topic)
        if err != nil {
diff --git a/server/pubsub/bus_test.go b/server/pubsub/bus_test.go
index 809fe2b..09cfd1b 100644
--- a/server/pubsub/bus_test.go
+++ b/server/pubsub/bus_test.go
@@ -22,7 +22,7 @@ import (
 
        "github.com/apache/servicecomb-kie/server/config"
        "github.com/apache/servicecomb-kie/server/pubsub"
-       uuid "github.com/satori/go.uuid"
+       "github.com/satori/go.uuid"
 )
 
 func TestInit(t *testing.T) {
@@ -34,8 +34,7 @@ func TestInit(t *testing.T) {
                UUID:  uuid.NewV4().String(),
                Event: make(chan *pubsub.KVChangeEvent, 1),
        }
-       _ = pubsub.ObserveOnce(o, &pubsub.Topic{
-               Key:      "some_key",
+       _ = pubsub.AddObserver(o, &pubsub.Topic{
                Project:  "1",
                DomainID: "2",
                Labels: map[string]string{
diff --git a/server/pubsub/event_handler.go b/server/pubsub/event_handler.go
index 707bcce..1858b62 100644
--- a/server/pubsub/event_handler.go
+++ b/server/pubsub/event_handler.go
@@ -19,34 +19,78 @@ package pubsub
 
 import (
        "strings"
+       "sync"
+       "time"
 
        "github.com/go-chassis/openlog"
        "github.com/hashicorp/serf/serf"
 )
 
-//EventHandler handler serf custom event
+//EventHandler handler serf custom event, it is singleton
 type EventHandler struct {
+       BatchSize          int
+       BatchInterval      time.Duration
+       Immediate          bool
+       pendingEvents      sync.Map
+       pendingEventsCount int
 }
 
 //HandleEvent send event to subscribers
 func (h *EventHandler) HandleEvent(e serf.Event) {
-       openlog.Info("receive event:" + e.EventType().String())
+       openlog.Debug("receive event:" + e.EventType().String())
        switch e.EventType().String() {
        case "user":
                if strings.Contains(e.String(), EventKVChange) {
-                       handleKVEvent(e)
+                       h.handleKVEvent(e)
                }
        }
 
 }
+func (h *EventHandler) RunFlushTask() {
+       for {
+               if h.pendingEventsCount >= h.BatchSize {
+                       h.fireEvents()
+               }
+               <-time.After(h.BatchInterval)
+               h.fireEvents()
+       }
 
-func handleKVEvent(e serf.Event) {
+}
+func (h *EventHandler) handleKVEvent(e serf.Event) {
        ue := e.(serf.UserEvent)
        ke, err := NewKVChangeEvent(ue.Payload)
        if err != nil {
                openlog.Error("invalid json:" + string(ue.Payload))
        }
        openlog.Debug("kv event:" + ke.Key)
+       if h.Immediate { //never retain event, not recommended
+               h.FindTopicAndFire(ke)
+       } else {
+               h.mergeAndSave(ke)
+       }
+
+}
+func (h *EventHandler) mergeAndSave(ke *KVChangeEvent) {
+       id := ke.String()
+       _, ok := h.pendingEvents.Load(id)
+       if ok {
+               openlog.Debug("ignore same event: " + id)
+               return
+       }
+       h.pendingEvents.Store(id, ke)
+       h.pendingEventsCount++
+}
+func (h *EventHandler) fireEvents() {
+       h.pendingEvents.Range(func(key, value interface{}) bool {
+               ke := value.(*KVChangeEvent)
+               h.FindTopicAndFire(ke)
+               h.pendingEvents.Delete(key)
+               h.pendingEventsCount--
+               return true
+       })
+}
+
+func (h *EventHandler) FindTopicAndFire(ke *KVChangeEvent) {
        topics.Range(func(key, value interface{}) bool { //range all topics
                t, err := ParseTopicString(key.(string))
                if err != nil {
diff --git a/server/pubsub/struct.go b/server/pubsub/struct.go
index dd7d156..1a2d5d9 100644
--- a/server/pubsub/struct.go
+++ b/server/pubsub/struct.go
@@ -30,7 +30,7 @@ import (
 // const
 const (
        ActionPut    = "put"
-       ActionDelete = "delete"
+       ActionDelete = "del"
 )
 
 //KVChangeEvent is event between kie nodes, and broadcast by serf
@@ -42,6 +42,10 @@ type KVChangeEvent struct {
        Project  string
 }
 
+func (e *KVChangeEvent) String() string {
+       return strings.Join([]string{e.Key, e.Action, 
stringutil.FormatMap(e.Labels), e.DomainID, e.Project}, ";;")
+}
+
 //NewKVChangeEvent create a struct base on event payload
 func NewKVChangeEvent(payload []byte) (*KVChangeEvent, error) {
        ke := &KVChangeEvent{}
@@ -51,7 +55,6 @@ func NewKVChangeEvent(payload []byte) (*KVChangeEvent, error) 
{
 
 //Topic can be subscribe
 type Topic struct {
-       Key          string            `json:"key,omitempty"`
        Labels       map[string]string `json:"-"`
        LabelsFormat string            `json:"labels,omitempty"`
        DomainID     string            `json:"domainID,omitempty"`
@@ -92,11 +95,6 @@ func ParseTopicString(s string) (*Topic, error) {
 //update request labels or a subset of it.
 func (t *Topic) Match(event *KVChangeEvent) bool {
        match := false
-       if t.Key != "" {
-               if t.Key == event.Key {
-                       match = true
-               }
-       }
        if t.MatchType == common.PatternExact {
                if !util.IsEquivalentLabel(t.Labels, event.Labels) {
                        return false
@@ -116,8 +114,6 @@ func (t *Topic) Match(event *KVChangeEvent) bool {
 
 //Observer represents a client polling request
 type Observer struct {
-       UUID      string
-       RemoteIP  string
-       UserAgent string
-       Event     chan *KVChangeEvent
+       UUID  string
+       Event chan *KVChangeEvent
 }
diff --git a/server/pubsub/struct_test.go b/server/pubsub/struct_test.go
index f3fb349..8069284 100644
--- a/server/pubsub/struct_test.go
+++ b/server/pubsub/struct_test.go
@@ -26,7 +26,6 @@ import (
 
 func TestTopic_String(t *testing.T) {
        topic := &pubsub.Topic{
-               Key: "test",
                Labels: map[string]string{
                        "a": "b",
                        "c": "d",
@@ -44,14 +43,12 @@ func TestTopic_String(t *testing.T) {
        t.Log(topic)
        b, _ = json.Marshal(topic)
        t.Log(string(b))
-       topic = &pubsub.Topic{
-               Key: "test",
-       }
+       topic = &pubsub.Topic{}
        t.Log(topic)
        b, _ = json.Marshal(topic)
        t.Log(string(b))
 
-       mock := 
[]byte(`{"key":"some_key","labels":"a=b::c=d","domainID":"2","project":"1"}`)
+       mock := []byte(`{"labels":"a=b::c=d","domainID":"2","project":"1"}`)
        topic, _ = pubsub.ParseTopicString(string(mock))
        t.Log(topic)
 }
diff --git a/server/resource/v1/common.go b/server/resource/v1/common.go
index 82a3009..0feab3f 100644
--- a/server/resource/v1/common.go
+++ b/server/resource/v1/common.go
@@ -21,9 +21,11 @@ import (
        "context"
        "encoding/json"
        "errors"
+       "github.com/satori/go.uuid"
        "net/http"
        "strconv"
        "strings"
+       "sync"
        "time"
 
        "github.com/apache/servicecomb-kie/server/datasource"
@@ -38,7 +40,6 @@ import (
        "github.com/go-chassis/cari/config"
        "github.com/go-chassis/go-chassis/v2/server/restful"
        "github.com/go-chassis/openlog"
-       uuid "github.com/satori/go.uuid"
        "gopkg.in/yaml.v2"
 )
 
@@ -52,6 +53,15 @@ const (
        FmtReadRequestError = "decode request body failed: %v"
 )
 
+var observers = sync.Pool{
+       New: func() interface{} {
+               return &pubsub.Observer{
+                       UUID:  uuid.NewV4().String(),
+                       Event: make(chan *pubsub.KVChangeEvent, 1),
+               }
+       },
+}
+
 //err
 var (
        ErrInvalidRev = errors.New(common.MsgInvalidRev)
@@ -177,19 +187,15 @@ func getMatchPattern(rctx *restful.Context) string {
        }
        return m
 }
-func eventHappened(rctx *restful.Context, waitStr string, topic *pubsub.Topic) 
(bool, error) {
+func eventHappened(waitStr string, topic *pubsub.Topic) (bool, error) {
        d, err := time.ParseDuration(waitStr)
        if err != nil || d > common.MaxWait {
                return false, errors.New(common.MsgInvalidWait)
        }
        happened := true
-       o := &pubsub.Observer{
-               UUID:      uuid.NewV4().String(),
-               RemoteIP:  rctx.ReadRequest().RemoteAddr, //TODO x forward ip
-               UserAgent: rctx.ReadHeader(HeaderUserAgent),
-               Event:     make(chan *pubsub.KVChangeEvent, 1),
-       }
-       err = pubsub.ObserveOnce(o, topic)
+       o := observers.Get().(*pubsub.Observer)
+       defer observers.Put(o)
+       err = pubsub.AddObserver(o, topic)
        if err != nil {
                return false, errors.New("observe once failed: " + err.Error())
        }
diff --git a/server/resource/v1/kv_resource.go 
b/server/resource/v1/kv_resource.go
index 882dc87..0ac3242 100644
--- a/server/resource/v1/kv_resource.go
+++ b/server/resource/v1/kv_resource.go
@@ -201,21 +201,14 @@ func returnData(rctx *restful.Context, request 
*model.ListKVRequest) {
                        queryAndResponse(rctx, request)
                        return
                }
-               changed, err := eventHappened(rctx, wait, &pubsub.Topic{
-                       Labels:    request.Labels,
-                       Project:   request.Project,
-                       MatchType: request.Match,
-                       DomainID:  request.Domain,
-               })
-               if err != nil {
-                       WriteErrResponse(rctx, config.ErrObserveEvent, 
err.Error())
+               if !isLegalWaitRequest(rctx, request) {
                        return
                }
-               if changed {
-                       queryAndResponse(rctx, request)
+               if watch(rctx, request, wait) {
                        return
                }
                rctx.WriteHeader(http.StatusNotModified)
+               return
        } else {
                revised, err := isRevised(rctx.Ctx, revStr, request.Domain)
                if err != nil {
@@ -230,18 +223,10 @@ func returnData(rctx *restful.Context, request 
*model.ListKVRequest) {
                        queryAndResponse(rctx, request)
                        return
                } else if wait != "" {
-                       changed, err := eventHappened(rctx, wait, &pubsub.Topic{
-                               Labels:    request.Labels,
-                               Project:   request.Project,
-                               MatchType: request.Match,
-                               DomainID:  request.Domain,
-                       })
-                       if err != nil {
-                               WriteErrResponse(rctx, config.ErrObserveEvent, 
err.Error())
+                       if !isLegalWaitRequest(rctx, request) {
                                return
                        }
-                       if changed {
-                               queryAndResponse(rctx, request)
+                       if watch(rctx, request, wait) {
                                return
                        }
                        rctx.WriteHeader(http.StatusNotModified)
@@ -251,6 +236,30 @@ func returnData(rctx *restful.Context, request 
*model.ListKVRequest) {
                }
        }
 }
+func isLegalWaitRequest(rctx *restful.Context, request *model.ListKVRequest) 
bool {
+       if request.Key != "" {
+               WriteErrResponse(rctx, config.ErrInvalidParams, "can not accept 
key params, when using wait")
+               return false
+       }
+       return true
+}
+func watch(rctx *restful.Context, request *model.ListKVRequest, wait string) 
bool {
+       changed, err := eventHappened(wait, &pubsub.Topic{
+               Labels:    request.Labels,
+               Project:   request.Project,
+               MatchType: request.Match,
+               DomainID:  request.Domain,
+       })
+       if err != nil {
+               WriteErrResponse(rctx, config.ErrObserveEvent, err.Error())
+               return true
+       }
+       if changed {
+               queryAndResponse(rctx, request)
+               return true
+       }
+       return false
+}
 
 //Delete deletes one kv by id
 func (r *KVResource) Delete(rctx *restful.Context) {
diff --git a/server/resource/v1/kv_resource_test.go 
b/server/resource/v1/kv_resource_test.go
index bd44725..790183b 100644
--- a/server/resource/v1/kv_resource_test.go
+++ b/server/resource/v1/kv_resource_test.go
@@ -336,17 +336,28 @@ func TestKVResource_List(t *testing.T) {
                assert.Equal(t, http.StatusOK, resp.Result().StatusCode)
        })
        t.Run("list kv by service label, with wait and current rev param,should 
wait and return 304 ", func(t *testing.T) {
-               r, _ := http.NewRequest("GET", 
"/v1/kv_test/kie/kv?label=service:utService&wait=1s&"+common2.QueryParamRev+"="+rev,
 nil)
+               r, _ := http.NewRequest("GET", 
"/v1/kv_test/kie/kv?label=service:utService&"+common2.QueryParamRev+"=1", nil)
                r.Header.Set("Content-Type", "application/json")
                kvr := &v1.KVResource{}
                c, err := restfultest.New(kvr, nil)
                assert.NoError(t, err)
                resp := httptest.NewRecorder()
-               start := time.Now()
                c.ServeHTTP(resp, r)
-               duration := time.Since(start)
-               t.Log(duration)
-               assert.Equal(t, http.StatusNotModified, 
resp.Result().StatusCode)
+               assert.Equal(t, http.StatusOK, resp.Result().StatusCode)
+               rev = resp.Header().Get(common2.HeaderRevision)
+               t.Log(rev)
+
+               r2, _ := http.NewRequest("GET", 
"/v1/kv_test/kie/kv?label=service:utService&wait=2s&"+common2.QueryParamRev+"="+rev,
 nil)
+               r2.Header.Set("Content-Type", "application/json")
+               t.Log(r2.URL.String())
+               resp2 := httptest.NewRecorder()
+               c.ServeHTTP(resp2, r2)
+               rev = resp2.Header().Get(common2.HeaderRevision)
+               t.Log(rev)
+               body, err := ioutil.ReadAll(resp2.Body)
+               time.Sleep(1 * time.Second)
+               t.Log(string(body))
+               assert.Equal(t, http.StatusNotModified, 
resp2.Result().StatusCode)
        })
        t.Run("list kv by service label, with wait param,will exceed 1s and 
return 304", func(t *testing.T) {
                r, _ := http.NewRequest("GET", 
"/v1/kv_test/kie/kv?label=service:utService&wait=1s", nil)
diff --git a/server/service/kv/kv_svc.go b/server/service/kv/kv_svc.go
index d473bd1..f92c9b0 100644
--- a/server/service/kv/kv_svc.go
+++ b/server/service/kv/kv_svc.go
@@ -22,21 +22,25 @@ import (
        "fmt"
        "time"
 
-       "github.com/go-chassis/cari/config"
-       "github.com/go-chassis/cari/pkg/errsvc"
-       "github.com/go-chassis/foundation/validator"
-       "github.com/go-chassis/go-chassis/v2/pkg/backends/quota"
-       "github.com/go-chassis/openlog"
-       uuid "github.com/satori/go.uuid"
-
        "github.com/apache/servicecomb-kie/pkg/common"
+       "github.com/apache/servicecomb-kie/pkg/concurrency"
        "github.com/apache/servicecomb-kie/pkg/model"
        "github.com/apache/servicecomb-kie/pkg/stringutil"
        "github.com/apache/servicecomb-kie/server/datasource"
        "github.com/apache/servicecomb-kie/server/pubsub"
+       "github.com/go-chassis/cari/config"
+       "github.com/go-chassis/cari/pkg/errsvc"
+       "github.com/go-chassis/foundation/validator"
+       "github.com/go-chassis/go-chassis/v2/pkg/backends/quota"
+       "github.com/go-chassis/openlog"
+       "github.com/satori/go.uuid"
 )
 
+var sema = concurrency.NewSemaphore(concurrency.DefaultConcurrency)
+
 func ListKV(ctx context.Context, request *model.ListKVRequest) (int64, 
*model.KVResponse, *errsvc.Error) {
+       sema.Acquire()
+       defer sema.Release()
        opts := []datasource.FindOption{
                datasource.WithKey(request.Key),
                datasource.WithLabels(request.Labels),
diff --git a/test/benchmark/list.sh b/test/benchmark/list.sh
new file mode 100644
index 0000000..14feeff
--- /dev/null
+++ b/test/benchmark/list.sh
@@ -0,0 +1 @@
+ab -c 200 -n 20000 
http://127.0.0.1:30110/v1/default/kie/kv?label=app:default&label=env:test
\ No newline at end of file
diff --git a/test/benchmark/watch.go b/test/benchmark/watch.go
new file mode 100644
index 0000000..1c14ea6
--- /dev/null
+++ b/test/benchmark/watch.go
@@ -0,0 +1,82 @@
+package main
+
+import (
+       "crypto/tls"
+       "errors"
+       "log"
+       "net/http"
+       "net/url"
+       "sync"
+       "sync/atomic"
+       "time"
+)
+
+var success int32
+var fail int32
+var total int32
+var clientNum = 10000
+var wg = sync.WaitGroup{}
+var client = &http.Client{}
+var start time.Time
+
+func main() {
+       u := 
"http://127.0.0.1:30110/v1/default/kie/kv?label=app:default&label=env:test&wait=5m";
+       req, err := http.NewRequest("GET", u, nil)
+       if err != nil {
+               log.Panic(err)
+               return
+       }
+       uri, err := url.Parse(u)
+       if err != nil {
+               log.Panic(err)
+       }
+       wg.Add(clientNum)
+       client = &http.Client{
+               Timeout: 1 * time.Minute,
+       }
+       if uri.Scheme == "https" {
+               client.Transport = &http.Transport{
+                       TLSClientConfig: &tls.Config{
+                               InsecureSkipVerify: true,
+                       },
+               }
+       }
+       for n := 0; n < clientNum; n++ {
+               go func() {
+                       defer wg.Done()
+                       err = watch(req)
+                       if err != nil {
+                               atomic.AddInt32(&fail, 1)
+                               return
+                       }
+                       atomic.AddInt32(&success, 1)
+                       if total == 0 {
+                               start = time.Now()
+                               atomic.AddInt32(&total, 1)
+                       }
+               }()
+
+       }
+
+       wg.Wait()
+       duration := time.Since(start)
+       log.Printf("success %d", success)
+       log.Printf("fail %d", fail)
+       log.Printf("takes %s", duration.String())
+}
+
+func watch(req *http.Request) error {
+       res, err := client.Do(req)
+       if err != nil {
+               log.Println(err)
+               return err
+       }
+       defer res.Body.Close()
+
+       if res.Status != "200 OK" {
+               log.Println(res.Status)
+               return errors.New("not OK")
+       }
+
+       return nil
+}
diff --git a/test/init.go b/test/init.go
index c5a91ad..696486e 100644
--- a/test/init.go
+++ b/test/init.go
@@ -42,8 +42,8 @@ func init() {
        if err != nil {
                panic(err)
        }
-       kind := archaius.GetString("TEST_DB_KIND", "mongo")
-       uri := archaius.GetString("TEST_DB_URI", 
"mongodb://kie:[email protected]:27017/kie")
+       kind := archaius.GetString("TEST_DB_KIND", "etcd")
+       uri := archaius.GetString("TEST_DB_URI", "http://127.0.0.1:2379";)
        archaius.Init(archaius.WithMemorySource())
        archaius.Set("servicecomb.cipher.plugin", "default")
        cipher.Init()

Reply via email to