This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git
The following commit(s) were added to refs/heads/master by this push:
new 24b40d7 optimize long polling performance (#210)
24b40d7 is described below
commit 24b40d78f892292fc78b39e64fdd0accf19c4a7a
Author: Shawn <[email protected]>
AuthorDate: Thu Aug 26 09:06:01 2021 +0800
optimize long polling performance (#210)
---
.github/workflows/etcd_storage.yml | 24 +++++--
.github/workflows/mongo_storage.yml | 4 +-
docs/api.yaml | 4 +-
docs/development-guide/dev.md | 40 +++++++++++
examples/dev/kie-conf.yaml | 25 +++----
go.mod | 2 +-
pkg/concurrency/semaphore.go | 35 +++++++++
pkg/util/util_test.go | 14 ++++
.../datasource/mongo/track/polling_detail_dao.go | 2 +-
server/pubsub/bus.go | 19 +++--
server/pubsub/bus_test.go | 5 +-
server/pubsub/event_handler.go | 52 ++++++++++++--
server/pubsub/struct.go | 18 ++---
server/pubsub/struct_test.go | 7 +-
server/resource/v1/common.go | 24 ++++---
server/resource/v1/kv_resource.go | 49 +++++++------
server/resource/v1/kv_resource_test.go | 21 ++++--
server/service/kv/kv_svc.go | 18 +++--
test/benchmark/list.sh | 1 +
test/benchmark/watch.go | 82 ++++++++++++++++++++++
test/init.go | 4 +-
21 files changed, 354 insertions(+), 96 deletions(-)
diff --git a/.github/workflows/etcd_storage.yml
b/.github/workflows/etcd_storage.yml
index 9a53a8c..c15f90f 100644
--- a/.github/workflows/etcd_storage.yml
+++ b/.github/workflows/etcd_storage.yml
@@ -13,15 +13,25 @@ jobs:
uses: actions/checkout@v1
- name: UT for etcd
run: |
- docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name
etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls
http://0.0.0.0:2379
+ time docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd etcd
-name etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls
http://0.0.0.0:2379
while ! nc -z 127.0.0.1 2379; do
sleep 1
done
export TEST_DB_KIND=etcd
export TEST_DB_URI=127.0.0.1:2379
- go test $(go list ./... | grep -v mongo | grep -v third_party | grep
-v examples) -cover -covermode atomic -coverprofile coverage.out
- - name: UT for embedded etcd
- run: |
- export TEST_DB_KIND=embedded_etcd
- export TEST_DB_URI=default=http://127.0.0.1:2380
- go test $(go list ./... | grep -v mongo | grep -v third_party | grep
-v examples) -cover -covermode atomic -coverprofile coverage.out
\ No newline at end of file
+ time go test $(go list ./... | grep -v mongo | grep -v third_party |
grep -v examples)
+ embedded-etcd-storage:
+ runs-on: ubuntu-latest
+ steps:
+ - name: Set up Go
+ uses: actions/setup-go@v1
+ with:
+ go-version: 1.16
+ id: go
+ - name: Check out code into the Go module directory
+ uses: actions/checkout@v1
+ - name: UT for embedded etcd
+ run: |
+ export TEST_DB_KIND=embedded_etcd
+ export TEST_DB_URI=default=http://127.0.0.1:2380
+ go test $(go list ./... | grep -v mongo | grep -v third_party |
grep -v examples)
\ No newline at end of file
diff --git a/.github/workflows/mongo_storage.yml
b/.github/workflows/mongo_storage.yml
index 42bc194..1a34750 100644
--- a/.github/workflows/mongo_storage.yml
+++ b/.github/workflows/mongo_storage.yml
@@ -14,10 +14,10 @@ jobs:
- name: UT
run: |
cd build
- bash build_docker.sh
+ time bash build_docker.sh
cd ../
sudo docker-compose -f ./deployments/docker/docker-compose.yaml up -d
sleep 20
export TEST_DB_KIND=mongo
export TEST_DB_URI=mongodb://kie:[email protected]:27017/kie
- go test $(go list ./... | grep -v etcd | grep -v third_party | grep
-v examples) -cover -covermode atomic -coverprofile coverage.out
\ No newline at end of file
+ time go test -v $(go list ./... | grep -v etcd | grep -v third_party
| grep -v examples)
\ No newline at end of file
diff --git a/docs/api.yaml b/docs/api.yaml
index 8c00d5f..f9b3bab 100644
--- a/docs/api.yaml
+++ b/docs/api.yaml
@@ -1,7 +1,7 @@
swagger: '2.0'
info:
- title: ''
- version: ''
+ title: 'servicecomb-kie'
+ version: 'v1'
basePath: /
paths:
/v1/health:
diff --git a/docs/development-guide/dev.md b/docs/development-guide/dev.md
new file mode 100644
index 0000000..f0e6dea
--- /dev/null
+++ b/docs/development-guide/dev.md
@@ -0,0 +1,40 @@
+# Preparing for Development
+
+### Setting Go
+follow the official website to install
+
+### Build
+```shell
+cd examples/dev
+go build github.com/apache/servicecomb-kie/cmd/kieserver
+```
+
+### Setting up Config File
+the most important thing is set the persistence storage kind, you can set it
to "embedded_etcd",
+so that you don't need to set up an etcd or mongodb
+```shell
+vim examples/dev/kie-conf.yaml
+```
+```yaml
+db:
+ # kind can be mongo, etcd, embedded_etcd
+ kind: embedded_etcd
+ # uri is the db endpoints list
+ # kind=mongo, then is the mongodb cluster's uri, e.g.
mongodb://127.0.0.1:27017/kie
+ # kind=etcd, then is the remote etcd server's advertise-client-urls, e.g.
http://127.0.0.1:2379
+ # kind=embedded_etcd, then is the embedded etcd server's
advertise-peer-urls, e.g. default=http://127.0.0.1:2380
+ #uri: mongodb://kie:[email protected]:27017/kie
+```
+### Setting up go chassis config file
+kie is developed base on go chassis and uses most of its features, go chassis
need config file to launch itself
+
+```shell
+cd examples/dev/conf
+```
+move conf folder to same level with "kieserver" binary, if you move binary to
"examples/dev",
+you don't need to move or change those files
+### Run
+Now you can launch your config server
+```shell
+./kieserver --config kie-conf.yaml
+```
\ No newline at end of file
diff --git a/examples/dev/kie-conf.yaml b/examples/dev/kie-conf.yaml
index 0607d72..39a5b03 100644
--- a/examples/dev/kie-conf.yaml
+++ b/examples/dev/kie-conf.yaml
@@ -1,18 +1,19 @@
db:
# kind can be mongo, etcd, embedded_etcd
- kind: mongo
+ kind: etcd
# uri is the db endpoints list
# kind=mongo, then is the mongodb cluster's uri, e.g.
mongodb://127.0.0.1:27017/kie
# kind=etcd, then is the remote etcd server's advertise-client-urls, e.g.
http://127.0.0.1:2379
# kind=embedded_etcd, then is the embedded etcd server's
advertise-peer-urls, e.g. default=http://127.0.0.1:2380
- uri: mongodb://kie:[email protected]:27017/kie
- poolSize: 10
- timeout: 5m
- sslEnabled: false
- rootCAFile: ./ssl/trust.cer
- certFile: ./ssl/server.cer
- keyFile: ./ssl/server_key.pem
- certPwdFile: ./ssl/cert_pwd
-rbac:
- enabled: false
- rsaPublicKeyFile: ./examples/dev/public.key
\ No newline at end of file
+ #uri: mongodb://kie:[email protected]:27017/kie
+ uri: http://127.0.0.1:2380
+# poolSize: 10
+# timeout: 5m
+# sslEnabled: false
+# rootCAFile: ./ssl/trust.cer
+# certFile: ./ssl/server.cer
+# keyFile: ./ssl/server_key.pem
+# certPwdFile: ./ssl/cert_pwd
+#rbac:
+# enabled: false
+# rsaPublicKeyFile: ./examples/dev/public.key
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 79d3649..c8e0a93 100644
--- a/go.mod
+++ b/go.mod
@@ -18,4 +18,4 @@ require (
gopkg.in/yaml.v2 v2.3.0
)
-go 1.13
+go 1.16
diff --git a/pkg/concurrency/semaphore.go b/pkg/concurrency/semaphore.go
new file mode 100644
index 0000000..1dbf671
--- /dev/null
+++ b/pkg/concurrency/semaphore.go
@@ -0,0 +1,35 @@
+package concurrency
+
+import "math"
+
+const (
+ DefaultConcurrency = 500
+ MaxConcurrency = math.MaxUint16
+)
+
+//Semaphore ctl the max concurrency
+type Semaphore struct {
+ tickets chan bool
+}
+
+//NewSemaphore accept concurrency number, not more than 65535
+func NewSemaphore(concurrency int) *Semaphore {
+ if concurrency >= math.MaxUint16 {
+ concurrency = MaxConcurrency
+ }
+ b := &Semaphore{
+ tickets: make(chan bool, concurrency),
+ }
+ for i := 0; i < concurrency; i++ {
+ b.tickets <- true
+ }
+ return b
+}
+func (b *Semaphore) Acquire() {
+ <-b.tickets
+}
+
+//Release return back signal
+func (b *Semaphore) Release() {
+ b.tickets <- true
+}
diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go
index a98e0ed..44a0e0b 100644
--- a/pkg/util/util_test.go
+++ b/pkg/util/util_test.go
@@ -42,3 +42,17 @@ func TestIsEquivalentLabel(t *testing.T) {
assert.Equal(t, util.IsEquivalentLabel(m3, m4), true)
assert.Equal(t, util.IsEquivalentLabel(m3, m5), false)
}
+func BenchmarkIsEquivalentLabel(b *testing.B) {
+ m1 := map[string]string{
+ "foo": "bar",
+ "a": "b",
+ }
+ m2 := map[string]string{
+ "foo": "bar",
+ "c": "d",
+ "s": "d",
+ }
+ for i := 0; i < b.N; i++ {
+ util.IsEquivalentLabel(m1, m2)
+ }
+}
diff --git a/server/datasource/mongo/track/polling_detail_dao.go
b/server/datasource/mongo/track/polling_detail_dao.go
index 9d71252..5a10590 100644
--- a/server/datasource/mongo/track/polling_detail_dao.go
+++ b/server/datasource/mongo/track/polling_detail_dao.go
@@ -24,7 +24,7 @@ import (
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/datasource/mongo/session"
"github.com/go-chassis/openlog"
- uuid "github.com/satori/go.uuid"
+ "github.com/satori/go.uuid"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
diff --git a/server/pubsub/bus.go b/server/pubsub/bus.go
index ab9925e..b8b2b18 100644
--- a/server/pubsub/bus.go
+++ b/server/pubsub/bus.go
@@ -18,9 +18,10 @@
package pubsub
import (
- "encoding/json"
"sync"
+ "time"
+ "encoding/json"
"github.com/apache/servicecomb-kie/pkg/stringutil"
"github.com/apache/servicecomb-kie/server/config"
"github.com/go-chassis/openlog"
@@ -33,7 +34,9 @@ var bus *Bus
//const
const (
- EventKVChange = "kv-changed"
+ EventKVChange = "kv-chg"
+ DefaultEventBatchSize = 5000
+ DefaultEventBatchInterval = 500 * time.Millisecond
)
var mutexObservers sync.RWMutex
@@ -84,7 +87,13 @@ func Start() {
openlog.Fatal("can not sync key value change events to other
kie nodes" + err.Error())
}
openlog.Info("kie message bus started")
- bus.agent.RegisterEventHandler(&EventHandler{})
+ eh := &EventHandler{
+ BatchInterval: DefaultEventBatchInterval,
+ BatchSize: DefaultEventBatchSize,
+ Immediate: true,
+ }
+ go eh.RunFlushTask()
+ bus.agent.RegisterEventHandler(eh)
}
func join(addresses []string) error {
_, err := bus.agent.Join(addresses, false)
@@ -104,8 +113,8 @@ func Publish(event *KVChangeEvent) error {
}
-//ObserveOnce observe key changes by (key or labels) or (key and labels)
-func ObserveOnce(o *Observer, topic *Topic) error {
+//AddObserver observe key changes by (key or labels) or (key and labels)
+func AddObserver(o *Observer, topic *Topic) error {
topic.LabelsFormat = stringutil.FormatMap(topic.Labels)
b, err := json.Marshal(topic)
if err != nil {
diff --git a/server/pubsub/bus_test.go b/server/pubsub/bus_test.go
index 809fe2b..09cfd1b 100644
--- a/server/pubsub/bus_test.go
+++ b/server/pubsub/bus_test.go
@@ -22,7 +22,7 @@ import (
"github.com/apache/servicecomb-kie/server/config"
"github.com/apache/servicecomb-kie/server/pubsub"
- uuid "github.com/satori/go.uuid"
+ "github.com/satori/go.uuid"
)
func TestInit(t *testing.T) {
@@ -34,8 +34,7 @@ func TestInit(t *testing.T) {
UUID: uuid.NewV4().String(),
Event: make(chan *pubsub.KVChangeEvent, 1),
}
- _ = pubsub.ObserveOnce(o, &pubsub.Topic{
- Key: "some_key",
+ _ = pubsub.AddObserver(o, &pubsub.Topic{
Project: "1",
DomainID: "2",
Labels: map[string]string{
diff --git a/server/pubsub/event_handler.go b/server/pubsub/event_handler.go
index 707bcce..1858b62 100644
--- a/server/pubsub/event_handler.go
+++ b/server/pubsub/event_handler.go
@@ -19,34 +19,78 @@ package pubsub
import (
"strings"
+ "sync"
+ "time"
"github.com/go-chassis/openlog"
"github.com/hashicorp/serf/serf"
)
-//EventHandler handler serf custom event
+//EventHandler handler serf custom event, it is singleton
type EventHandler struct {
+ BatchSize int
+ BatchInterval time.Duration
+ Immediate bool
+ pendingEvents sync.Map
+ pendingEventsCount int
}
//HandleEvent send event to subscribers
func (h *EventHandler) HandleEvent(e serf.Event) {
- openlog.Info("receive event:" + e.EventType().String())
+ openlog.Debug("receive event:" + e.EventType().String())
switch e.EventType().String() {
case "user":
if strings.Contains(e.String(), EventKVChange) {
- handleKVEvent(e)
+ h.handleKVEvent(e)
}
}
}
+func (h *EventHandler) RunFlushTask() {
+ for {
+ if h.pendingEventsCount >= h.BatchSize {
+ h.fireEvents()
+ }
+ <-time.After(h.BatchInterval)
+ h.fireEvents()
+ }
-func handleKVEvent(e serf.Event) {
+}
+func (h *EventHandler) handleKVEvent(e serf.Event) {
ue := e.(serf.UserEvent)
ke, err := NewKVChangeEvent(ue.Payload)
if err != nil {
openlog.Error("invalid json:" + string(ue.Payload))
}
openlog.Debug("kv event:" + ke.Key)
+ if h.Immediate { //never retain event, not recommended
+ h.FindTopicAndFire(ke)
+ } else {
+ h.mergeAndSave(ke)
+ }
+
+}
+func (h *EventHandler) mergeAndSave(ke *KVChangeEvent) {
+ id := ke.String()
+ _, ok := h.pendingEvents.Load(id)
+ if ok {
+ openlog.Debug("ignore same event: " + id)
+ return
+ }
+ h.pendingEvents.Store(id, ke)
+ h.pendingEventsCount++
+}
+func (h *EventHandler) fireEvents() {
+ h.pendingEvents.Range(func(key, value interface{}) bool {
+ ke := value.(*KVChangeEvent)
+ h.FindTopicAndFire(ke)
+ h.pendingEvents.Delete(key)
+ h.pendingEventsCount--
+ return true
+ })
+}
+
+func (h *EventHandler) FindTopicAndFire(ke *KVChangeEvent) {
topics.Range(func(key, value interface{}) bool { //range all topics
t, err := ParseTopicString(key.(string))
if err != nil {
diff --git a/server/pubsub/struct.go b/server/pubsub/struct.go
index dd7d156..1a2d5d9 100644
--- a/server/pubsub/struct.go
+++ b/server/pubsub/struct.go
@@ -30,7 +30,7 @@ import (
// const
const (
ActionPut = "put"
- ActionDelete = "delete"
+ ActionDelete = "del"
)
//KVChangeEvent is event between kie nodes, and broadcast by serf
@@ -42,6 +42,10 @@ type KVChangeEvent struct {
Project string
}
+func (e *KVChangeEvent) String() string {
+ return strings.Join([]string{e.Key, e.Action,
stringutil.FormatMap(e.Labels), e.DomainID, e.Project}, ";;")
+}
+
//NewKVChangeEvent create a struct base on event payload
func NewKVChangeEvent(payload []byte) (*KVChangeEvent, error) {
ke := &KVChangeEvent{}
@@ -51,7 +55,6 @@ func NewKVChangeEvent(payload []byte) (*KVChangeEvent, error)
{
//Topic can be subscribe
type Topic struct {
- Key string `json:"key,omitempty"`
Labels map[string]string `json:"-"`
LabelsFormat string `json:"labels,omitempty"`
DomainID string `json:"domainID,omitempty"`
@@ -92,11 +95,6 @@ func ParseTopicString(s string) (*Topic, error) {
//update request labels or a subset of it.
func (t *Topic) Match(event *KVChangeEvent) bool {
match := false
- if t.Key != "" {
- if t.Key == event.Key {
- match = true
- }
- }
if t.MatchType == common.PatternExact {
if !util.IsEquivalentLabel(t.Labels, event.Labels) {
return false
@@ -116,8 +114,6 @@ func (t *Topic) Match(event *KVChangeEvent) bool {
//Observer represents a client polling request
type Observer struct {
- UUID string
- RemoteIP string
- UserAgent string
- Event chan *KVChangeEvent
+ UUID string
+ Event chan *KVChangeEvent
}
diff --git a/server/pubsub/struct_test.go b/server/pubsub/struct_test.go
index f3fb349..8069284 100644
--- a/server/pubsub/struct_test.go
+++ b/server/pubsub/struct_test.go
@@ -26,7 +26,6 @@ import (
func TestTopic_String(t *testing.T) {
topic := &pubsub.Topic{
- Key: "test",
Labels: map[string]string{
"a": "b",
"c": "d",
@@ -44,14 +43,12 @@ func TestTopic_String(t *testing.T) {
t.Log(topic)
b, _ = json.Marshal(topic)
t.Log(string(b))
- topic = &pubsub.Topic{
- Key: "test",
- }
+ topic = &pubsub.Topic{}
t.Log(topic)
b, _ = json.Marshal(topic)
t.Log(string(b))
- mock :=
[]byte(`{"key":"some_key","labels":"a=b::c=d","domainID":"2","project":"1"}`)
+ mock := []byte(`{"labels":"a=b::c=d","domainID":"2","project":"1"}`)
topic, _ = pubsub.ParseTopicString(string(mock))
t.Log(topic)
}
diff --git a/server/resource/v1/common.go b/server/resource/v1/common.go
index 82a3009..0feab3f 100644
--- a/server/resource/v1/common.go
+++ b/server/resource/v1/common.go
@@ -21,9 +21,11 @@ import (
"context"
"encoding/json"
"errors"
+ "github.com/satori/go.uuid"
"net/http"
"strconv"
"strings"
+ "sync"
"time"
"github.com/apache/servicecomb-kie/server/datasource"
@@ -38,7 +40,6 @@ import (
"github.com/go-chassis/cari/config"
"github.com/go-chassis/go-chassis/v2/server/restful"
"github.com/go-chassis/openlog"
- uuid "github.com/satori/go.uuid"
"gopkg.in/yaml.v2"
)
@@ -52,6 +53,15 @@ const (
FmtReadRequestError = "decode request body failed: %v"
)
+var observers = sync.Pool{
+ New: func() interface{} {
+ return &pubsub.Observer{
+ UUID: uuid.NewV4().String(),
+ Event: make(chan *pubsub.KVChangeEvent, 1),
+ }
+ },
+}
+
//err
var (
ErrInvalidRev = errors.New(common.MsgInvalidRev)
@@ -177,19 +187,15 @@ func getMatchPattern(rctx *restful.Context) string {
}
return m
}
-func eventHappened(rctx *restful.Context, waitStr string, topic *pubsub.Topic)
(bool, error) {
+func eventHappened(waitStr string, topic *pubsub.Topic) (bool, error) {
d, err := time.ParseDuration(waitStr)
if err != nil || d > common.MaxWait {
return false, errors.New(common.MsgInvalidWait)
}
happened := true
- o := &pubsub.Observer{
- UUID: uuid.NewV4().String(),
- RemoteIP: rctx.ReadRequest().RemoteAddr, //TODO x forward ip
- UserAgent: rctx.ReadHeader(HeaderUserAgent),
- Event: make(chan *pubsub.KVChangeEvent, 1),
- }
- err = pubsub.ObserveOnce(o, topic)
+ o := observers.Get().(*pubsub.Observer)
+ defer observers.Put(o)
+ err = pubsub.AddObserver(o, topic)
if err != nil {
return false, errors.New("observe once failed: " + err.Error())
}
diff --git a/server/resource/v1/kv_resource.go
b/server/resource/v1/kv_resource.go
index 882dc87..0ac3242 100644
--- a/server/resource/v1/kv_resource.go
+++ b/server/resource/v1/kv_resource.go
@@ -201,21 +201,14 @@ func returnData(rctx *restful.Context, request
*model.ListKVRequest) {
queryAndResponse(rctx, request)
return
}
- changed, err := eventHappened(rctx, wait, &pubsub.Topic{
- Labels: request.Labels,
- Project: request.Project,
- MatchType: request.Match,
- DomainID: request.Domain,
- })
- if err != nil {
- WriteErrResponse(rctx, config.ErrObserveEvent,
err.Error())
+ if !isLegalWaitRequest(rctx, request) {
return
}
- if changed {
- queryAndResponse(rctx, request)
+ if watch(rctx, request, wait) {
return
}
rctx.WriteHeader(http.StatusNotModified)
+ return
} else {
revised, err := isRevised(rctx.Ctx, revStr, request.Domain)
if err != nil {
@@ -230,18 +223,10 @@ func returnData(rctx *restful.Context, request
*model.ListKVRequest) {
queryAndResponse(rctx, request)
return
} else if wait != "" {
- changed, err := eventHappened(rctx, wait, &pubsub.Topic{
- Labels: request.Labels,
- Project: request.Project,
- MatchType: request.Match,
- DomainID: request.Domain,
- })
- if err != nil {
- WriteErrResponse(rctx, config.ErrObserveEvent,
err.Error())
+ if !isLegalWaitRequest(rctx, request) {
return
}
- if changed {
- queryAndResponse(rctx, request)
+ if watch(rctx, request, wait) {
return
}
rctx.WriteHeader(http.StatusNotModified)
@@ -251,6 +236,30 @@ func returnData(rctx *restful.Context, request
*model.ListKVRequest) {
}
}
}
+func isLegalWaitRequest(rctx *restful.Context, request *model.ListKVRequest)
bool {
+ if request.Key != "" {
+ WriteErrResponse(rctx, config.ErrInvalidParams, "can not accept
key params, when using wait")
+ return false
+ }
+ return true
+}
+func watch(rctx *restful.Context, request *model.ListKVRequest, wait string)
bool {
+ changed, err := eventHappened(wait, &pubsub.Topic{
+ Labels: request.Labels,
+ Project: request.Project,
+ MatchType: request.Match,
+ DomainID: request.Domain,
+ })
+ if err != nil {
+ WriteErrResponse(rctx, config.ErrObserveEvent, err.Error())
+ return true
+ }
+ if changed {
+ queryAndResponse(rctx, request)
+ return true
+ }
+ return false
+}
//Delete deletes one kv by id
func (r *KVResource) Delete(rctx *restful.Context) {
diff --git a/server/resource/v1/kv_resource_test.go
b/server/resource/v1/kv_resource_test.go
index bd44725..790183b 100644
--- a/server/resource/v1/kv_resource_test.go
+++ b/server/resource/v1/kv_resource_test.go
@@ -336,17 +336,28 @@ func TestKVResource_List(t *testing.T) {
assert.Equal(t, http.StatusOK, resp.Result().StatusCode)
})
t.Run("list kv by service label, with wait and current rev param,should
wait and return 304 ", func(t *testing.T) {
- r, _ := http.NewRequest("GET",
"/v1/kv_test/kie/kv?label=service:utService&wait=1s&"+common2.QueryParamRev+"="+rev,
nil)
+ r, _ := http.NewRequest("GET",
"/v1/kv_test/kie/kv?label=service:utService&"+common2.QueryParamRev+"=1", nil)
r.Header.Set("Content-Type", "application/json")
kvr := &v1.KVResource{}
c, err := restfultest.New(kvr, nil)
assert.NoError(t, err)
resp := httptest.NewRecorder()
- start := time.Now()
c.ServeHTTP(resp, r)
- duration := time.Since(start)
- t.Log(duration)
- assert.Equal(t, http.StatusNotModified,
resp.Result().StatusCode)
+ assert.Equal(t, http.StatusOK, resp.Result().StatusCode)
+ rev = resp.Header().Get(common2.HeaderRevision)
+ t.Log(rev)
+
+ r2, _ := http.NewRequest("GET",
"/v1/kv_test/kie/kv?label=service:utService&wait=2s&"+common2.QueryParamRev+"="+rev,
nil)
+ r2.Header.Set("Content-Type", "application/json")
+ t.Log(r2.URL.String())
+ resp2 := httptest.NewRecorder()
+ c.ServeHTTP(resp2, r2)
+ rev = resp2.Header().Get(common2.HeaderRevision)
+ t.Log(rev)
+ body, err := ioutil.ReadAll(resp2.Body)
+ time.Sleep(1 * time.Second)
+ t.Log(string(body))
+ assert.Equal(t, http.StatusNotModified,
resp2.Result().StatusCode)
})
t.Run("list kv by service label, with wait param,will exceed 1s and
return 304", func(t *testing.T) {
r, _ := http.NewRequest("GET",
"/v1/kv_test/kie/kv?label=service:utService&wait=1s", nil)
diff --git a/server/service/kv/kv_svc.go b/server/service/kv/kv_svc.go
index d473bd1..f92c9b0 100644
--- a/server/service/kv/kv_svc.go
+++ b/server/service/kv/kv_svc.go
@@ -22,21 +22,25 @@ import (
"fmt"
"time"
- "github.com/go-chassis/cari/config"
- "github.com/go-chassis/cari/pkg/errsvc"
- "github.com/go-chassis/foundation/validator"
- "github.com/go-chassis/go-chassis/v2/pkg/backends/quota"
- "github.com/go-chassis/openlog"
- uuid "github.com/satori/go.uuid"
-
"github.com/apache/servicecomb-kie/pkg/common"
+ "github.com/apache/servicecomb-kie/pkg/concurrency"
"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/pkg/stringutil"
"github.com/apache/servicecomb-kie/server/datasource"
"github.com/apache/servicecomb-kie/server/pubsub"
+ "github.com/go-chassis/cari/config"
+ "github.com/go-chassis/cari/pkg/errsvc"
+ "github.com/go-chassis/foundation/validator"
+ "github.com/go-chassis/go-chassis/v2/pkg/backends/quota"
+ "github.com/go-chassis/openlog"
+ "github.com/satori/go.uuid"
)
+var sema = concurrency.NewSemaphore(concurrency.DefaultConcurrency)
+
func ListKV(ctx context.Context, request *model.ListKVRequest) (int64,
*model.KVResponse, *errsvc.Error) {
+ sema.Acquire()
+ defer sema.Release()
opts := []datasource.FindOption{
datasource.WithKey(request.Key),
datasource.WithLabels(request.Labels),
diff --git a/test/benchmark/list.sh b/test/benchmark/list.sh
new file mode 100644
index 0000000..14feeff
--- /dev/null
+++ b/test/benchmark/list.sh
@@ -0,0 +1 @@
+ab -c 200 -n 20000
http://127.0.0.1:30110/v1/default/kie/kv?label=app:default&label=env:test
\ No newline at end of file
diff --git a/test/benchmark/watch.go b/test/benchmark/watch.go
new file mode 100644
index 0000000..1c14ea6
--- /dev/null
+++ b/test/benchmark/watch.go
@@ -0,0 +1,82 @@
+package main
+
+import (
+ "crypto/tls"
+ "errors"
+ "log"
+ "net/http"
+ "net/url"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+var success int32
+var fail int32
+var total int32
+var clientNum = 10000
+var wg = sync.WaitGroup{}
+var client = &http.Client{}
+var start time.Time
+
+func main() {
+ u :=
"http://127.0.0.1:30110/v1/default/kie/kv?label=app:default&label=env:test&wait=5m"
+ req, err := http.NewRequest("GET", u, nil)
+ if err != nil {
+ log.Panic(err)
+ return
+ }
+ uri, err := url.Parse(u)
+ if err != nil {
+ log.Panic(err)
+ }
+ wg.Add(clientNum)
+ client = &http.Client{
+ Timeout: 1 * time.Minute,
+ }
+ if uri.Scheme == "https" {
+ client.Transport = &http.Transport{
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: true,
+ },
+ }
+ }
+ for n := 0; n < clientNum; n++ {
+ go func() {
+ defer wg.Done()
+ err = watch(req)
+ if err != nil {
+ atomic.AddInt32(&fail, 1)
+ return
+ }
+ atomic.AddInt32(&success, 1)
+ if total == 0 {
+ start = time.Now()
+ atomic.AddInt32(&total, 1)
+ }
+ }()
+
+ }
+
+ wg.Wait()
+ duration := time.Since(start)
+ log.Printf("success %d", success)
+ log.Printf("fail %d", fail)
+ log.Printf("takes %s", duration.String())
+}
+
+func watch(req *http.Request) error {
+ res, err := client.Do(req)
+ if err != nil {
+ log.Println(err)
+ return err
+ }
+ defer res.Body.Close()
+
+ if res.Status != "200 OK" {
+ log.Println(res.Status)
+ return errors.New("not OK")
+ }
+
+ return nil
+}
diff --git a/test/init.go b/test/init.go
index c5a91ad..696486e 100644
--- a/test/init.go
+++ b/test/init.go
@@ -42,8 +42,8 @@ func init() {
if err != nil {
panic(err)
}
- kind := archaius.GetString("TEST_DB_KIND", "mongo")
- uri := archaius.GetString("TEST_DB_URI",
"mongodb://kie:[email protected]:27017/kie")
+ kind := archaius.GetString("TEST_DB_KIND", "etcd")
+ uri := archaius.GetString("TEST_DB_URI", "http://127.0.0.1:2379")
archaius.Init(archaius.WithMemorySource())
archaius.Set("servicecomb.cipher.plugin", "default")
cipher.Init()