This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch patch-0.2.0
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git
The following commit(s) were added to refs/heads/patch-0.2.0 by this push:
new 502dfdc [fix] 1,修复读写并发问题 2,修复search查询出错不返回err 3,解除健康检查对etcd的依赖
4,golang-lint问题修复 (#322)
502dfdc is described below
commit 502dfdc9f7f00f29985bb46328a86a4d106302d9
Author: tornado-ssy <[email protected]>
AuthorDate: Mon Apr 1 20:40:19 2024 +0800
[fix] 1,修复读写并发问题 2,修复search查询出错不返回err 3,解除健康检查对etcd的依赖 4,golang-lint问题修复
(#322)
* add value filter in ListKV API (#302)
Co-authored-by: tornado-ssy <[email protected]>
(cherry picked from commit 93bbb89069fe7b4dd017422e897246837dc665c1)
* fix the concurrent bug of KvIdCache
* fix the bug of do not report the error which occured in action of get
kvdocs from etcd
* fix the bug of do not report the error which occured in action of get
kvdocs from etcd
* resolve conflicts in master
* [fix] fix golangci-lint (#318)
Co-authored-by: songshiyuan 00649746 <[email protected]>
(cherry picked from commit 577408ac263c0518ecd53ab7dfe70dad1d3b1fd9)
* [fix] cancel the depency between healthcheck and etcd (#319)
Co-authored-by: songshiyuan 00649746 <[email protected]>
(cherry picked from commit fcacc0dabea387ed319d677bc26b056781a4a942)
---------
Co-authored-by: little-cui <[email protected]>
Co-authored-by: songshiyuan 00649746 <[email protected]>
---
.github/workflows/golangci-lint.yml | 2 +-
pkg/common/common.go | 2 ++
pkg/model/db_schema.go | 1 +
server/datasource/etcd/kv/kv_cache.go | 57 +++++++++++++++++++------------
server/datasource/etcd/kv/kv_dao.go | 3 ++
server/datasource/mongo/kv/kv_dao.go | 3 ++
server/datasource/options.go | 8 +++++
server/resource/v1/admin_resource.go | 10 ++++--
server/resource/v1/admin_resource_test.go | 18 ++++++++--
server/resource/v1/kv_resource.go | 1 +
server/resource/v1/kv_resource_test.go | 32 +++++++++++++++++
server/server.go | 7 ++--
server/service/kv/kv_svc.go | 5 +++
13 files changed, 119 insertions(+), 30 deletions(-)
diff --git a/.github/workflows/golangci-lint.yml
b/.github/workflows/golangci-lint.yml
index b977042..3c8ae4f 100644
--- a/.github/workflows/golangci-lint.yml
+++ b/.github/workflows/golangci-lint.yml
@@ -15,7 +15,7 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v2
with:
- version: v1.51.2
+ version: v1.55.2
args: --enable gofmt,gocyclo,goimports,dupl,gosec --timeout 5m
--skip-dirs=examples,test --skip-files=.*_test.go$
static-checks:
runs-on: ubuntu-latest
diff --git a/pkg/common/common.go b/pkg/common/common.go
index 5188d32..7eb1d3d 100644
--- a/pkg/common/common.go
+++ b/pkg/common/common.go
@@ -27,6 +27,7 @@ const (
QueryParamRev = "revision"
QueryParamMatch = "match"
QueryParamKey = "key"
+ QueryParamValue = "value"
QueryParamLabel = "label"
QueryParamStatus = "status"
QueryParamOffset = "offset"
@@ -39,6 +40,7 @@ const (
QueryParamURLPath = "urlPath"
QueryParamUserAgent = "userAgent"
QueryParamOverride = "override"
+ QueryParamMode = "mode"
)
// http headers
diff --git a/pkg/model/db_schema.go b/pkg/model/db_schema.go
index 28b98d3..442da43 100644
--- a/pkg/model/db_schema.go
+++ b/pkg/model/db_schema.go
@@ -96,6 +96,7 @@ type ListKVRequest struct {
Project string `json:"project,omitempty"
yaml:"project,omitempty" validate:"min=1,max=256,commonName"`
Domain string `json:"domain,omitempty"
yaml:"domain,omitempty" validate:"min=1,max=256,commonName"` //redundant
Key string `json:"key" yaml:"key"
validate:"max=128,getKey"`
+ Value string `json:"value" yaml:"value" validate:"max=128"`
Labels map[string]string `json:"labels,omitempty"
yaml:"labels,omitempty" validate:"max=8,dive,keys,labelK,endkeys,labelV"`
//redundant
Offset int64 `validate:"min=0"`
Limit int64 `validate:"min=0,max=100"`
diff --git a/server/datasource/etcd/kv/kv_cache.go
b/server/datasource/etcd/kv/kv_cache.go
index 5776bf7..7cf6529 100644
--- a/server/datasource/etcd/kv/kv_cache.go
+++ b/server/datasource/etcd/kv/kv_cache.go
@@ -9,15 +9,16 @@ import (
"sync"
"time"
- "github.com/apache/servicecomb-kie/pkg/model"
- "github.com/apache/servicecomb-kie/pkg/stringutil"
- "github.com/apache/servicecomb-kie/server/datasource"
- "github.com/apache/servicecomb-kie/server/datasource/etcd/key"
"github.com/go-chassis/foundation/backoff"
"github.com/go-chassis/openlog"
"github.com/little-cui/etcdadpt"
goCache "github.com/patrickmn/go-cache"
"go.etcd.io/etcd/api/v3/mvccpb"
+
+ "github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/apache/servicecomb-kie/pkg/stringutil"
+ "github.com/apache/servicecomb-kie/server/datasource"
+ "github.com/apache/servicecomb-kie/server/datasource/etcd/key"
)
func Init() {
@@ -35,8 +36,6 @@ const (
backOffMinInterval = 5 * time.Second
)
-type IDSet map[string]struct{}
-
type Cache struct {
timeOut time.Duration
client etcdadpt.Client
@@ -158,11 +157,13 @@ func (kc *Cache) cachePut(rsp *etcdadpt.Response) {
cacheKey := kc.GetCacheKey(kvDoc.Domain, kvDoc.Project,
kvDoc.Labels)
m, ok := kc.LoadKvIDSet(cacheKey)
if !ok {
- kc.StoreKvIDSet(cacheKey, IDSet{kvDoc.ID: struct{}{}})
+ z := &sync.Map{}
+ z.Store(kvDoc.ID, struct{}{})
+ kc.StoreKvIDSet(cacheKey, z)
openlog.Info("cacheKey " + cacheKey + "not exists")
continue
}
- m[kvDoc.ID] = struct{}{}
+ m.Store(kvDoc.ID, struct{}{})
}
}
@@ -180,23 +181,23 @@ func (kc *Cache) cacheDelete(rsp *etcdadpt.Response) {
openlog.Error("cacheKey " + cacheKey + "not exists")
continue
}
- delete(m, kvDoc.ID)
+ m.Delete(kvDoc.ID)
}
}
-func (kc *Cache) LoadKvIDSet(cacheKey string) (IDSet, bool) {
+func (kc *Cache) LoadKvIDSet(cacheKey string) (*sync.Map, bool) {
val, ok := kc.kvIDCache.Load(cacheKey)
if !ok {
return nil, false
}
- kvIds, ok := val.(IDSet)
+ kvIds, ok := val.(*sync.Map)
if !ok {
return nil, false
}
return kvIds, true
}
-func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds IDSet) {
+func (kc *Cache) StoreKvIDSet(cacheKey string, kvIds *sync.Map) {
kc.kvIDCache.Store(cacheKey, kvIds)
}
@@ -232,22 +233,25 @@ func Search(ctx context.Context, req *CacheSearchReq)
(*model.KVResponse, bool,
cacheKey := kvCache.GetCacheKey(req.Domain, req.Project,
req.Opts.Labels)
kvIds, ok := kvCache.LoadKvIDSet(cacheKey)
if !ok {
- kvCache.StoreKvIDSet(cacheKey, IDSet{})
+ kvCache.StoreKvIDSet(cacheKey, &sync.Map{})
return result, true, nil
}
var docs []*model.KVDoc
var kvIdsLeft []string
- for kvID := range kvIds {
- if doc, ok := kvCache.LoadKvDoc(kvID); ok {
+ kvIds.Range(func(kvID, value any) bool {
+ if doc, ok := kvCache.LoadKvDoc(kvID.(string)); ok {
docs = append(docs, doc)
- continue
+ } else {
+ kvIdsLeft = append(kvIdsLeft, kvID.(string))
}
- kvIdsLeft = append(kvIdsLeft, kvID)
+ return true
+ })
+ tpData, err := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
+ if err != nil {
+ return nil, true, err
}
-
- tpData := kvCache.getKvFromEtcd(ctx, req, kvIdsLeft)
docs = append(docs, tpData...)
for _, doc := range docs {
@@ -260,14 +264,15 @@ func Search(ctx context.Context, req *CacheSearchReq)
(*model.KVResponse, bool,
return result, true, nil
}
-func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq,
kvIdsLeft []string) []*model.KVDoc {
+func (kc *Cache) getKvFromEtcd(ctx context.Context, req *CacheSearchReq,
kvIdsLeft []string) ([]*model.KVDoc, error) {
if len(kvIdsLeft) == 0 {
- return nil
+ return nil, nil
}
openlog.Debug("get kv from etcd by kvId")
wg := sync.WaitGroup{}
docs := make([]*model.KVDoc, len(kvIdsLeft))
+ var getKvErr error
for i, kvID := range kvIdsLeft {
wg.Add(1)
go func(kvID string, cnt int) {
@@ -277,12 +282,14 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req
*CacheSearchReq, kvIdsLe
kv, err := etcdadpt.Get(ctx, docKey)
if err != nil {
openlog.Error(fmt.Sprintf("failed to get kv
from etcd, err %v", err))
+ getKvErr = err
return
}
doc, err := kc.GetKvDoc(kv)
if err != nil {
openlog.Error(fmt.Sprintf("failed to unmarshal
kv, err %v", err))
+ getKvErr = err
return
}
@@ -291,7 +298,10 @@ func (kc *Cache) getKvFromEtcd(ctx context.Context, req
*CacheSearchReq, kvIdsLe
}(kvID, i)
}
wg.Wait()
- return docs
+ if getKvErr != nil {
+ return nil, getKvErr
+ }
+ return docs, nil
}
func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
@@ -304,6 +314,9 @@ func isMatch(req *CacheSearchReq, doc *model.KVDoc) bool {
if req.Regex != nil && !req.Regex.MatchString(doc.Key) {
return false
}
+ if req.Opts.Value != "" && !strings.Contains(doc.Value, req.Opts.Value)
{
+ return false
+ }
return true
}
diff --git a/server/datasource/etcd/kv/kv_dao.go
b/server/datasource/etcd/kv/kv_dao.go
index 84c6958..d7260df 100644
--- a/server/datasource/etcd/kv/kv_dao.go
+++ b/server/datasource/etcd/kv/kv_dao.go
@@ -646,5 +646,8 @@ func filterMatch(doc *model.KVDoc, opts
datasource.FindOptions, regex *regexp.Re
if opts.LabelFormat != "" && doc.LabelFormat != opts.LabelFormat {
return false
}
+ if opts.Value != "" && !strings.Contains(doc.Value, opts.Value) {
+ return false
+ }
return true
}
diff --git a/server/datasource/mongo/kv/kv_dao.go
b/server/datasource/mongo/kv/kv_dao.go
index 79eb6ff..79ed45f 100644
--- a/server/datasource/mongo/kv/kv_dao.go
+++ b/server/datasource/mongo/kv/kv_dao.go
@@ -271,6 +271,9 @@ func findKV(ctx context.Context, domain string, project
string, opts datasource.
filter["key"] = bson.M{"$regex": "^" + value + "$",
"$options": "$i"}
}
}
+ if opts.Value != "" {
+ filter["value"] = bson.M{"$regex": opts.Value}
+ }
if len(opts.Labels) != 0 {
for k, v := range opts.Labels {
filter["labels."+k] = v
diff --git a/server/datasource/options.go b/server/datasource/options.go
index 2e311f5..7de8877 100644
--- a/server/datasource/options.go
+++ b/server/datasource/options.go
@@ -63,6 +63,7 @@ type FindOptions struct {
Depth int
ID string
Key string
+ Value string
Labels map[string]string
LabelFormat string
ClearLabel bool
@@ -115,6 +116,13 @@ func WithKey(key string) FindOption {
}
}
+// WithValue find by value
+func WithValue(value string) FindOption {
+ return func(o *FindOptions) {
+ o.Value = value
+ }
+}
+
// WithStatus enabled/disabled
func WithStatus(status string) FindOption {
return func(o *FindOptions) {
diff --git a/server/resource/v1/admin_resource.go
b/server/resource/v1/admin_resource.go
index 4d79eb1..d63b0a4 100644
--- a/server/resource/v1/admin_resource.go
+++ b/server/resource/v1/admin_resource.go
@@ -22,13 +22,15 @@ import (
"strconv"
"time"
- "github.com/apache/servicecomb-kie/pkg/model"
- "github.com/apache/servicecomb-kie/server/datasource"
goRestful "github.com/emicklei/go-restful"
"github.com/go-chassis/cari/config"
"github.com/go-chassis/go-chassis/v2/pkg/runtime"
"github.com/go-chassis/go-chassis/v2/server/restful"
"github.com/go-chassis/openlog"
+
+ "github.com/apache/servicecomb-kie/pkg/common"
+ "github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/apache/servicecomb-kie/server/datasource"
)
type AdminResource struct {
@@ -57,6 +59,10 @@ func (r *AdminResource) URLPatterns() []restful.Route {
// HealthCheck provider version info and time info
func (r *AdminResource) HealthCheck(context *restful.Context) {
+ healthCheckMode := context.ReadQueryParameter(common.QueryParamMode)
+ if healthCheckMode == "liveness" {
+ return
+ }
domain := ReadDomain(context.Ctx)
resp := &model.DocHealthCheck{}
latest, err :=
datasource.GetBroker().GetRevisionDao().GetRevision(context.Ctx, domain)
diff --git a/server/resource/v1/admin_resource_test.go
b/server/resource/v1/admin_resource_test.go
index 2ed6c2f..d528030 100644
--- a/server/resource/v1/admin_resource_test.go
+++ b/server/resource/v1/admin_resource_test.go
@@ -27,10 +27,11 @@ import (
_ "github.com/apache/servicecomb-kie/test"
- "github.com/apache/servicecomb-kie/pkg/model"
- v1 "github.com/apache/servicecomb-kie/server/resource/v1"
"github.com/go-chassis/go-chassis/v2/server/restful/restfultest"
"github.com/stretchr/testify/assert"
+
+ "github.com/apache/servicecomb-kie/pkg/model"
+ v1 "github.com/apache/servicecomb-kie/server/resource/v1"
)
func Test_HeathCheck(t *testing.T) {
@@ -48,3 +49,16 @@ func Test_HeathCheck(t *testing.T) {
assert.NoError(t, err)
assert.NotEmpty(t, data)
}
+
+func Test_HeakthCheckLiveMode(t *testing.T) {
+ path := fmt.Sprintf("/v1/health?mode=liveness")
+ r, _ := http.NewRequest("GET", path, nil)
+
+ revision := &v1.AdminResource{}
+ c, err := restfultest.New(revision, nil)
+ assert.NoError(t, err)
+ resp := httptest.NewRecorder()
+ c.ServeHTTP(resp, r)
+ respcode := resp.Code
+ assert.NotEmpty(t, respcode)
+}
diff --git a/server/resource/v1/kv_resource.go
b/server/resource/v1/kv_resource.go
index 0f259d2..5a163fb 100644
--- a/server/resource/v1/kv_resource.go
+++ b/server/resource/v1/kv_resource.go
@@ -175,6 +175,7 @@ func (r *KVResource) List(rctx *restful.Context) {
Project: rctx.ReadPathParameter(common.PathParameterProject),
Domain: ReadDomain(rctx.Ctx),
Key: rctx.ReadQueryParameter(common.QueryParamKey),
+ Value: rctx.ReadQueryParameter(common.QueryParamValue),
Status: rctx.ReadQueryParameter(common.QueryParamStatus),
Match: getMatchPattern(rctx),
}
diff --git a/server/resource/v1/kv_resource_test.go
b/server/resource/v1/kv_resource_test.go
index cf25747..c52c3aa 100644
--- a/server/resource/v1/kv_resource_test.go
+++ b/server/resource/v1/kv_resource_test.go
@@ -253,6 +253,38 @@ func TestKVResource_List(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 3, len(result.Data))
})
+ t.Run("list kv by value, should return 1 kv", func(t *testing.T) {
+ r, _ := http.NewRequest("GET", "/v1/kv_test/kie/kv?value=aaa",
nil)
+ r.Header.Set("Content-Type", "application/json")
+ kvr := &v1.KVResource{}
+ c, err := restfultest.New(kvr, nil)
+ assert.NoError(t, err)
+ resp := httptest.NewRecorder()
+ c.ServeHTTP(resp, r)
+ body, err := ioutil.ReadAll(resp.Body)
+ assert.NoError(t, err)
+ assert.Equal(t, http.StatusOK, resp.Code, string(body))
+ result := &model.KVResponse{}
+ err = json.Unmarshal(body, result)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(result.Data))
+ })
+ t.Run("list kv by value, should return 1 kv", func(t *testing.T) {
+ r, _ := http.NewRequest("GET", "/v1/kv_test/kie/kv?value=AAA",
nil)
+ r.Header.Set("Content-Type", "application/json")
+ kvr := &v1.KVResource{}
+ c, err := restfultest.New(kvr, nil)
+ assert.NoError(t, err)
+ resp := httptest.NewRecorder()
+ c.ServeHTTP(resp, r)
+ body, err := ioutil.ReadAll(resp.Body)
+ assert.NoError(t, err)
+ assert.Equal(t, http.StatusOK, resp.Code, string(body))
+ result := &model.KVResponse{}
+ err = json.Unmarshal(body, result)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(result.Data))
+ })
var rev string
t.Run("list kv by service label, exact match,should return 2 kv",
func(t *testing.T) {
r, _ := http.NewRequest("GET",
"/v1/kv_test/kie/kv?label=service:utService&match=exact", nil)
diff --git a/server/server.go b/server/server.go
index 4214262..93e4587 100644
--- a/server/server.go
+++ b/server/server.go
@@ -18,6 +18,10 @@
package server
import (
+ chassis "github.com/go-chassis/go-chassis/v2"
+ "github.com/go-chassis/go-chassis/v2/core/common"
+ "github.com/go-chassis/openlog"
+
"github.com/apache/servicecomb-kie/pkg/validator"
"github.com/apache/servicecomb-kie/server/config"
"github.com/apache/servicecomb-kie/server/datasource"
@@ -25,9 +29,6 @@ import (
"github.com/apache/servicecomb-kie/server/pubsub"
"github.com/apache/servicecomb-kie/server/rbac"
v1 "github.com/apache/servicecomb-kie/server/resource/v1"
- "github.com/go-chassis/go-chassis/v2"
- "github.com/go-chassis/go-chassis/v2/core/common"
- "github.com/go-chassis/openlog"
)
func Run() {
diff --git a/server/service/kv/kv_svc.go b/server/service/kv/kv_svc.go
index acac1da..f5e81c5 100644
--- a/server/service/kv/kv_svc.go
+++ b/server/service/kv/kv_svc.go
@@ -20,6 +20,7 @@ package kv
import (
"context"
"crypto/sha256"
+ "errors"
"fmt"
"strings"
"time"
@@ -45,6 +46,7 @@ var listSema =
concurrency.NewSemaphore(concurrency.DefaultConcurrency)
func ListKV(ctx context.Context, request *model.ListKVRequest) (int64,
*model.KVResponse, *errsvc.Error) {
opts := []datasource.FindOption{
datasource.WithKey(request.Key),
+ datasource.WithValue(request.Value),
datasource.WithLabels(request.Labels),
datasource.WithOffset(request.Offset),
datasource.WithLimit(request.Limit),
@@ -126,6 +128,9 @@ func Create(ctx context.Context, kv *model.KVDoc)
(*model.KVDoc, *errsvc.Error)
kv, err = datasource.GetBroker().GetKVDao().Create(ctx, kv,
datasource.WithSync(sync.FromContext(ctx)))
if err != nil {
openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
+ if errors.Is(err, datasource.ErrKVAlreadyExists) {
+ err = config.NewError(config.ErrRecordAlreadyExists,
datasource.ErrKVAlreadyExists.Error())
+ }
return nil, util.SvcErr(err)
}
err = datasource.GetBroker().GetHistoryDao().AddHistory(ctx, kv)