This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git
The following commit(s) were added to refs/heads/master by this push:
new e72007d add upload interface (#185)
e72007d is described below
commit e72007deef4cc926baebb0972f357be22af9b823
Author: Sphairis <[email protected]>
AuthorDate: Thu Jul 1 17:30:50 2021 +0800
add upload interface (#185)
* add upload interface
* add upload response/interface/test
* Fix error
* add override strategy
* split logic and add override logic
* key
* fix test error
* modify the view
* fix error
* change request body
* fix error
* rebuild code
* fix error
* fix error
* change location
* fix error
* change param location
* fix static check
* Refactor strategy
* fix static check
* delete folder
Co-authored-by: SphaIris <aaalixiaopei123>
Co-authored-by: little-cui <[email protected]>
---
.github/workflows/golangci-lint.yml | 2 +-
go.mod | 2 +-
go.sum | 2 +
pkg/common/common.go | 1 +
pkg/model/db_schema.go | 9 ++
pkg/model/kv.go | 14 ++
server/resource/v1/common.go | 34 +----
server/resource/v1/doc_struct.go | 16 ++
server/resource/v1/kv_resource.go | 96 ++++++------
server/resource/v1/kv_resource_test.go | 183 +++++++++++++++++++++++
server/service/kv/kv.go | 153 +++++++++++++++++++
server/service/kv/override.go | 38 +++++
server/service/kv/override_abort.go | 47 ++++++
server/service/kv/override_force.go | 71 +++++++++
server/service/kv/override_skip.go | 47 ++++++
server/service/mongo/counter/revision.go | 4 +-
server/service/mongo/history/dao.go | 10 +-
server/service/mongo/kv/kv_dao.go | 21 ++-
server/service/mongo/kv/kv_service.go | 26 ++--
server/service/mongo/session/session.go | 39 ++---
server/service/mongo/track/polling_detail_dao.go | 2 +-
server/service/mongo/view/view_service.go | 19 +--
22 files changed, 708 insertions(+), 128 deletions(-)
diff --git a/.github/workflows/golangci-lint.yml
b/.github/workflows/golangci-lint.yml
index 9640ae5..508e33e 100644
--- a/.github/workflows/golangci-lint.yml
+++ b/.github/workflows/golangci-lint.yml
@@ -16,4 +16,4 @@ jobs:
uses: golangci/golangci-lint-action@v2
with:
version: v1.29
- args: --skip-dirs=examples --out-format=colored-line-number
--skip-files=.*_test.go$
\ No newline at end of file
+ args: --skip-dirs=examples,test --out-format=colored-line-number
--skip-files=.*_test.go$
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 539d13b..4dfdd8a 100644
--- a/go.mod
+++ b/go.mod
@@ -2,7 +2,7 @@ module github.com/apache/servicecomb-kie
require (
github.com/emicklei/go-restful v2.12.0+incompatible
- github.com/go-chassis/cari v0.4.1-0.20210528013912-6da8395f7ff9
+ github.com/go-chassis/cari v0.4.1-0.20210619062801-7681f1cfc0e5
github.com/go-chassis/foundation v0.3.1-0.20210602072914-a580bed505d0
github.com/go-chassis/go-archaius v1.5.2-0.20210301074935-e4694f6b077b
github.com/go-chassis/go-chassis/v2 v2.1.2-0.20210308033545-985e98e20637
diff --git a/go.sum b/go.sum
index f0e08bd..6682d6b 100644
--- a/go.sum
+++ b/go.sum
@@ -124,6 +124,8 @@ github.com/go-chassis/cari v0.4.0
h1:2rJ7pB4dfZIu5/HQwwJhmybC1n/0MXWkKieoHCu4tQc
github.com/go-chassis/cari v0.4.0/go.mod
h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8=
github.com/go-chassis/cari v0.4.1-0.20210528013912-6da8395f7ff9
h1:UxNOY1mnK7i9qRYeu0d2jsVItfoV0ga75RF6isOhn00=
github.com/go-chassis/cari v0.4.1-0.20210528013912-6da8395f7ff9/go.mod
h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8=
+github.com/go-chassis/cari v0.4.1-0.20210619062801-7681f1cfc0e5
h1:KwRfC/uEqbi//YlNiuUhhf0ADkE3zf6uc43LWXsITRo=
+github.com/go-chassis/cari v0.4.1-0.20210619062801-7681f1cfc0e5/go.mod
h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8=
github.com/go-chassis/foundation v0.2.2-0.20201210043510-9f6d3de40234/go.mod
h1:2PjwqpVwYEVaAldl5A58a08viH8p27pNeYaiE3ZxOBA=
github.com/go-chassis/foundation v0.2.2/go.mod
h1:2PjwqpVwYEVaAldl5A58a08viH8p27pNeYaiE3ZxOBA=
github.com/go-chassis/foundation v0.3.0
h1:jG4BIrK8fXD9jbTtJ5rOLGQZ1pQI/mLnDuVJzToCtos=
diff --git a/pkg/common/common.go b/pkg/common/common.go
index 2ed8a39..205f2ef 100644
--- a/pkg/common/common.go
+++ b/pkg/common/common.go
@@ -38,6 +38,7 @@ const (
QueryParamIP = "ip"
QueryParamURLPath = "urlPath"
QueryParamUserAgent = "userAgent"
+ QueryParamOverride = "override"
)
//http headers
diff --git a/pkg/model/db_schema.go b/pkg/model/db_schema.go
index 4ebc5b1..43a5873 100644
--- a/pkg/model/db_schema.go
+++ b/pkg/model/db_schema.go
@@ -98,4 +98,13 @@ type ListKVRequest struct {
Offset int64 `validate:"min=0"`
Limit int64 `validate:"min=0,max=100"`
Status string `json:"status,omitempty"
yaml:"status,omitempty" validate:"kvStatus"`
+ Match string `json:"match,omitempty"
yaml:"match,omitempty"`
+}
+
+// UploadKVRequest contains kv list upload request params
+type UploadKVRequest struct {
+ Domain string `json:"domain,omitempty" yaml:"domain,omitempty"
validate:"min=1,max=256,commonName"` //redundant
+ Project string `json:"project,omitempty" yaml:"project,omitempty"
validate:"min=1,max=256,commonName"`
+ KVs []*KVDoc
+ Override string
}
diff --git a/pkg/model/kv.go b/pkg/model/kv.go
index 65545eb..c9c7a20 100644
--- a/pkg/model/kv.go
+++ b/pkg/model/kv.go
@@ -71,6 +71,20 @@ type DocResponseGetKey struct {
Total int64 `json:"total"`
}
+//DocRespOfUpload is response doc
+type DocRespOfUpload struct {
+ Success []*KVDoc `json:"success"`
+ Failure []*DocFailedOfUpload `json:"failure"`
+}
+
+//DocFailedOfUpload is reponse doc
+type DocFailedOfUpload struct {
+ Key string `json:"key"`
+ Labels map[string]string `json:"labels"`
+ ErrCode int32 `json:"error_code"`
+ ErrMsg string `json:"error_message"`
+}
+
//PollingDataResponse is response doc
type PollingDataResponse struct {
Data []*PollingDetail `json:"data"`
diff --git a/server/resource/v1/common.go b/server/resource/v1/common.go
index 70f6010..5383928 100644
--- a/server/resource/v1/common.go
+++ b/server/resource/v1/common.go
@@ -21,9 +21,7 @@ import (
"context"
"encoding/json"
"errors"
- "github.com/apache/servicecomb-kie/pkg/model"
- "github.com/apache/servicecomb-kie/server/service/mongo/session"
- "github.com/go-chassis/cari/config"
+ kvsvc "github.com/apache/servicecomb-kie/server/service/kv"
"github.com/go-chassis/cari/rbac"
"net/http"
"strconv"
@@ -31,9 +29,12 @@ import (
"time"
"github.com/apache/servicecomb-kie/pkg/common"
+ "github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/server/pubsub"
"github.com/apache/servicecomb-kie/server/service"
+ "github.com/apache/servicecomb-kie/server/service/mongo/session"
goRestful "github.com/emicklei/go-restful"
+ "github.com/go-chassis/cari/config"
"github.com/go-chassis/go-chassis/v2/server/restful"
"github.com/go-chassis/openlog"
uuid "github.com/satori/go.uuid"
@@ -248,32 +249,13 @@ func checkDomainAndProject(domain, project string) error {
}
func queryAndResponse(rctx *restful.Context, request *model.ListKVRequest) {
- m := getMatchPattern(rctx)
- opts := []service.FindOption{
- service.WithKey(request.Key),
- service.WithLabels(request.Labels),
- service.WithOffset(request.Offset),
- service.WithLimit(request.Limit),
- }
- if m == common.PatternExact {
- opts = append(opts, service.WithExactLabels())
- }
- if request.Status != "" {
- opts = append(opts, service.WithStatus(request.Status))
- }
- rev, err := service.RevisionService.GetRevision(rctx.Ctx,
request.Domain)
- if err != nil {
- WriteErrResponse(rctx, config.ErrInternal, err.Error())
- return
- }
- kv, err := service.KVService.List(rctx.Ctx, request.Domain,
request.Project, opts...)
- if err != nil {
- openlog.Error("common: " + err.Error())
- WriteErrResponse(rctx, config.ErrInternal, common.MsgDBError)
+ rev, kv, queryErr := kvsvc.ListKV(rctx.Ctx, request)
+ if queryErr != nil {
+ WriteErrResponse(rctx, queryErr.Code, queryErr.Message)
return
}
rctx.ReadResponseWriter().Header().Set(common.HeaderRevision,
strconv.FormatInt(rev, 10))
- err = writeResponse(rctx, kv)
+ err := writeResponse(rctx, kv)
rctx.ReadRestfulRequest().SetAttribute(common.RespBodyContextKey,
kv.Data)
if err != nil {
openlog.Error(err.Error())
diff --git a/server/resource/v1/doc_struct.go b/server/resource/v1/doc_struct.go
index 045d06d..388bb0d 100644
--- a/server/resource/v1/doc_struct.go
+++ b/server/resource/v1/doc_struct.go
@@ -19,6 +19,7 @@ package v1
import (
"github.com/apache/servicecomb-kie/pkg/common"
+ "github.com/apache/servicecomb-kie/pkg/model"
goRestful "github.com/emicklei/go-restful"
"github.com/go-chassis/go-chassis/v2/server/restful"
@@ -170,6 +171,21 @@ type KVCreateBody struct {
ValueType string `json:"value_type"`
}
+//KVUploadBody is open api doc
+type KVUploadBody struct {
+ MetaData MetaData `json:"metadata"`
+ Data []*model.KVDoc `json:"data"`
+}
+
+//MetaData is extra info
+type MetaData struct {
+ Version string `json:"version"`
+ Annotations Annotations `json:"annotations"`
+}
+
+type Annotations struct {
+}
+
//KVUpdateBody is open api doc
type KVUpdateBody struct {
Status string `json:"status"`
diff --git a/server/resource/v1/kv_resource.go
b/server/resource/v1/kv_resource.go
index 52d9a97..a2c863d 100644
--- a/server/resource/v1/kv_resource.go
+++ b/server/resource/v1/kv_resource.go
@@ -21,17 +21,16 @@ package v1
import (
"encoding/json"
"fmt"
+ kvsvc "github.com/apache/servicecomb-kie/server/service/kv"
"net/http"
"github.com/apache/servicecomb-kie/pkg/common"
"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/server/pubsub"
"github.com/apache/servicecomb-kie/server/service"
- "github.com/apache/servicecomb-kie/server/service/mongo/session"
goRestful "github.com/emicklei/go-restful"
"github.com/go-chassis/cari/config"
"github.com/go-chassis/foundation/validator"
- "github.com/go-chassis/go-chassis/v2/pkg/backends/quota"
"github.com/go-chassis/go-chassis/v2/server/restful"
"github.com/go-chassis/openlog"
)
@@ -40,64 +39,46 @@ import (
type KVResource struct {
}
-//Post create a kv
-func (r *KVResource) Post(rctx *restful.Context) {
+//Upload upload kvs
+func (r *KVResource) Upload(rctx *restful.Context) {
var err error
- project := rctx.ReadPathParameter(common.PathParameterProject)
- kv := new(model.KVDoc)
- if err = readRequest(rctx, kv); err != nil {
+ inputUpload := new(KVUploadBody)
+ if err = readRequest(rctx, &inputUpload); err != nil {
WriteErrResponse(rctx, config.ErrInvalidParams,
fmt.Sprintf(FmtReadRequestError, err))
return
}
- domain := ReadDomain(rctx.Ctx)
- kv.Domain = domain
- kv.Project = project
- if kv.Status == "" {
- kv.Status = common.StatusDisabled
- }
- err = validator.Validate(kv)
- if err != nil {
- WriteErrResponse(rctx, config.ErrInvalidParams, err.Error())
- return
- }
- err = quota.PreCreate("", kv.Domain, "", 1)
+ result := kvsvc.Upload(rctx.Ctx, &model.UploadKVRequest{
+ Domain: ReadDomain(rctx.Ctx),
+ Project: rctx.ReadPathParameter(common.PathParameterProject),
+ KVs: inputUpload.Data,
+ Override: rctx.ReadQueryParameter(common.QueryParamOverride),
+ })
+ err = writeResponse(rctx, result)
if err != nil {
- if err == quota.ErrReached {
- openlog.Info(fmt.Sprintf("can not create kv %s@%s, due
to quota violation", kv.Key, kv.Project))
- WriteErrResponse(rctx, config.ErrNotEnoughQuota,
err.Error())
- return
- }
openlog.Error(err.Error())
- WriteErrResponse(rctx, config.ErrInternal, "quota check failed")
- return
}
- kv, err = service.KVService.Create(rctx.Ctx, kv)
- if err != nil {
- openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
- if err == session.ErrKVAlreadyExists {
- WriteErrResponse(rctx, config.ErrRecordAlreadyExists,
err.Error())
- return
- }
- WriteErrResponse(rctx, config.ErrInternal, "create kv failed")
+}
+
+//Post create a kv
+func (r *KVResource) Post(rctx *restful.Context) {
+ var err error
+ kv := new(model.KVDoc)
+ if err = readRequest(rctx, kv); err != nil {
+ WriteErrResponse(rctx, config.ErrInvalidParams,
fmt.Sprintf(FmtReadRequestError, err))
return
}
- err = pubsub.Publish(&pubsub.KVChangeEvent{
- Key: kv.Key,
- Labels: kv.Labels,
- Project: project,
- DomainID: kv.Domain,
- Action: pubsub.ActionPut,
- })
- if err != nil {
- openlog.Warn("lost kv change event when post:" + err.Error())
+ kv.Domain = ReadDomain(rctx.Ctx)
+ kv.Project = rctx.ReadPathParameter(common.PathParameterProject)
+ kv, postErr := kvsvc.Post(rctx.Ctx, kv)
+ if postErr != nil {
+ WriteErrResponse(rctx, postErr.Code, postErr.Message)
+ return
}
- openlog.Info(
- fmt.Sprintf("post [%s] success", kv.ID))
+ kvsvc.Publish(kv)
err = writeResponse(rctx, kv)
if err != nil {
openlog.Error(err.Error())
}
-
}
//Put update a kv
@@ -182,6 +163,7 @@ func (r *KVResource) List(rctx *restful.Context) {
Domain: ReadDomain(rctx.Ctx),
Key: rctx.ReadQueryParameter(common.QueryParamKey),
Status: rctx.ReadQueryParameter(common.QueryParamStatus),
+ Match: getMatchPattern(rctx),
}
labels, err := getLabels(rctx)
if err != nil {
@@ -217,7 +199,7 @@ func returnData(rctx *restful.Context, request
*model.ListKVRequest) {
changed, err := eventHappened(rctx, wait, &pubsub.Topic{
Labels: request.Labels,
Project: request.Project,
- MatchType: getMatchPattern(rctx),
+ MatchType: request.Match,
DomainID: request.Domain,
})
if err != nil {
@@ -246,7 +228,7 @@ func returnData(rctx *restful.Context, request
*model.ListKVRequest) {
changed, err := eventHappened(rctx, wait, &pubsub.Topic{
Labels: request.Labels,
Project: request.Project,
- MatchType: getMatchPattern(rctx),
+ MatchType: request.Match,
DomainID: request.Domain,
})
if err != nil {
@@ -348,6 +330,24 @@ func (r *KVResource) URLPatterns() []restful.Route {
return []restful.Route{
{
Method: http.MethodPost,
+ Path: "/v1/{project}/kie/file",
+ ResourceFunc: r.Upload,
+ FuncDesc: "upload key values",
+ Parameters: []*restful.Parameters{
+ DocPathProject,
+ DocHeaderContentTypeJSONAndYaml,
+ },
+ Read: KVUploadBody{},
+ Returns: []*restful.Returns{
+ {
+ Code: http.StatusOK,
+ Model: model.DocRespOfUpload{},
+ },
+ },
+ Consumes: []string{goRestful.MIME_JSON,
common.ContentTypeYaml},
+ Produces: []string{goRestful.MIME_JSON,
common.ContentTypeYaml},
+ }, {
+ Method: http.MethodPost,
Path: "/v1/{project}/kie/kv",
ResourceFunc: r.Post,
FuncDesc: "create a key value",
diff --git a/server/resource/v1/kv_resource_test.go
b/server/resource/v1/kv_resource_test.go
index 695ece2..1d7c1b3 100644
--- a/server/resource/v1/kv_resource_test.go
+++ b/server/resource/v1/kv_resource_test.go
@@ -367,6 +367,189 @@ func TestKVResource_List(t *testing.T) {
})
}
+func TestKVResource_Upload(t *testing.T) {
+ t.Run("test force with the same key and the same labels, and one
invalid input, should return 2 success and 1 failure", func(t *testing.T) {
+ input := new(v1.KVUploadBody)
+ input.Data = []*model.KVDoc{
+ {
+ Key: "1",
+ Value: "1",
+ Labels: map[string]string{"2": "2"},
+ },
+ {
+ Key: "1",
+ Value: "1",
+ Status: "invalid",
+ Labels: map[string]string{"1": "1"},
+ },
+ {
+ Key: "1",
+ Value: "1-update",
+ Labels: map[string]string{"2": "2"},
+ },
+ }
+ j, _ := json.Marshal(input)
+ r, _ := http.NewRequest("POST",
"/v1/kv_test/kie/file?override=force", bytes.NewBuffer(j))
+ r.Header.Set("Content-Type", "application/json")
+ kvr := &v1.KVResource{}
+ c, _ := restfultest.New(kvr, nil)
+ resp := httptest.NewRecorder()
+ c.ServeHTTP(resp, r)
+
+ body, err := ioutil.ReadAll(resp.Body)
+ assert.NoError(t, err)
+ data := &model.DocRespOfUpload{
+ Success: []*model.KVDoc{},
+ Failure: []*model.DocFailedOfUpload{},
+ }
+ err = json.Unmarshal(body, data)
+ assert.NoError(t, err)
+ assert.Equal(t, http.StatusOK, resp.Code)
+ assert.Equal(t, 1, len(data.Failure))
+ assert.Equal(t, 2, len(data.Success))
+ assert.Equal(t, data.Success[0].ID, data.Success[1].ID)
+ assert.Equal(t, "1-update", data.Success[1].Value)
+ })
+ t.Run("test force with the same key and not the same labels and ont
invalid input, should return 2 success and 1 failure", func(t *testing.T) {
+ input := new(v1.KVUploadBody)
+ input.Data = []*model.KVDoc{
+ {
+ Key: "2",
+ Value: "2",
+ Labels: map[string]string{"1": "1"},
+ },
+ {
+ Key: "2",
+ Value: "2",
+ Status: "invalid",
+ Labels: map[string]string{"1": "1"},
+ },
+
+ {
+ Key: "2",
+ Value: "2",
+ Labels: map[string]string{"2": "2"},
+ },
+ }
+ j, _ := json.Marshal(input)
+ r, _ := http.NewRequest("POST",
"/v1/kv_test/kie/file?override=force", bytes.NewBuffer(j))
+ r.Header.Set("Content-Type", "application/json")
+ kvr := &v1.KVResource{}
+ c, _ := restfultest.New(kvr, nil)
+ resp := httptest.NewRecorder()
+ c.ServeHTTP(resp, r)
+
+ body, err := ioutil.ReadAll(resp.Body)
+ assert.NoError(t, err)
+ data := &model.DocRespOfUpload{
+ Success: []*model.KVDoc{},
+ Failure: []*model.DocFailedOfUpload{},
+ }
+ err = json.Unmarshal(body, data)
+ assert.NoError(t, err)
+ assert.Equal(t, http.StatusOK, resp.Code)
+ assert.Equal(t, 1, len(data.Failure))
+ assert.Equal(t, 2, len(data.Success))
+ assert.NotEqual(t, data.Success[0].ID, data.Success[1].ID)
+ })
+ t.Run("test skip, with one invalid input, should return 2 success and 2
failure", func(t *testing.T) {
+ input := new(v1.KVUploadBody)
+ input.Data = []*model.KVDoc{
+ {
+ Key: "3",
+ Value: "1",
+ Labels: map[string]string{"2": "2"},
+ },
+ {
+ Key: "2",
+ Value: "2",
+ Status: "invalid",
+ Labels: map[string]string{"1": "1"},
+ },
+ {
+ Key: "3",
+ Value: "1-update",
+ Labels: map[string]string{"2": "2"},
+ },
+ {
+ Key: "4",
+ Value: "1",
+ Labels: map[string]string{"2": "2"},
+ ValueType: "text",
+ Status: "enabled",
+ },
+ }
+ j, _ := json.Marshal(input)
+ r, _ := http.NewRequest("POST",
"/v1/kv_test/kie/file?override=skip", bytes.NewBuffer(j))
+ r.Header.Set("Content-Type", "application/json")
+ kvr := &v1.KVResource{}
+ c, _ := restfultest.New(kvr, nil)
+ resp := httptest.NewRecorder()
+ c.ServeHTTP(resp, r)
+
+ body, err := ioutil.ReadAll(resp.Body)
+ assert.NoError(t, err)
+ data := &model.DocRespOfUpload{
+ Success: []*model.KVDoc{},
+ Failure: []*model.DocFailedOfUpload{},
+ }
+ err = json.Unmarshal(body, data)
+ assert.NoError(t, err)
+ assert.Equal(t, http.StatusOK, resp.Code)
+ assert.Equal(t, 2, len(data.Failure))
+ assert.Equal(t, 2, len(data.Success))
+ assert.Equal(t, "1", data.Success[0].Value)
+ assert.Equal(t, "1", data.Success[1].Value)
+ assert.Equal(t, "validate failed, field: KVDoc.Status, rule:
^$|^(enabled|disabled)$", data.Failure[0].ErrMsg)
+ assert.Equal(t, "skip overriding duplicate kvs",
data.Failure[1].ErrMsg)
+ })
+ t.Run("test abort, with one invalid input, should return 1 success and
3 failure", func(t *testing.T) {
+ input := new(v1.KVUploadBody)
+ input.Data = []*model.KVDoc{
+ {
+ Key: "5",
+ Value: "2",
+ Labels: map[string]string{"1": "1"},
+ },
+ {
+ Key: "5",
+ Value: "2-update",
+ Labels: map[string]string{"1": "1"},
+ },
+ {
+ Key: "5",
+ Value: "2-update",
+ Status: "invalid",
+ Labels: map[string]string{"1": "1"},
+ },
+ {
+ Key: "6",
+ Value: "2",
+ Labels: map[string]string{"4": "4"},
+ },
+ }
+ j, _ := json.Marshal(input)
+ r, _ := http.NewRequest("POST",
"/v1/kv_test/kie/file?override=abort", bytes.NewBuffer(j))
+ r.Header.Set("Content-Type", "application/json")
+ kvr := &v1.KVResource{}
+ c, _ := restfultest.New(kvr, nil)
+ resp := httptest.NewRecorder()
+ c.ServeHTTP(resp, r)
+
+ body, err := ioutil.ReadAll(resp.Body)
+ assert.NoError(t, err)
+ data := &model.DocRespOfUpload{
+ Success: []*model.KVDoc{},
+ Failure: []*model.DocFailedOfUpload{},
+ }
+ err = json.Unmarshal(body, data)
+ assert.NoError(t, err)
+ assert.Equal(t, http.StatusOK, resp.Code)
+ assert.Equal(t, 3, len(data.Failure))
+ assert.Equal(t, 1, len(data.Success))
+ assert.Equal(t, "2", data.Success[0].Value)
+ })
+}
func TestKVResource_PutAndGet(t *testing.T) {
var id string
kv := &model.KVDoc{
diff --git a/server/service/kv/kv.go b/server/service/kv/kv.go
new file mode 100644
index 0000000..2c80e6e
--- /dev/null
+++ b/server/service/kv/kv.go
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kv
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/servicecomb-kie/pkg/common"
+ "github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/apache/servicecomb-kie/server/pubsub"
+ "github.com/apache/servicecomb-kie/server/service"
+ "github.com/apache/servicecomb-kie/server/service/mongo/session"
+ "github.com/go-chassis/cari/config"
+ "github.com/go-chassis/cari/pkg/errsvc"
+ "github.com/go-chassis/foundation/validator"
+ "github.com/go-chassis/go-chassis/v2/pkg/backends/quota"
+ "github.com/go-chassis/openlog"
+)
+
+func ListKV(ctx context.Context, request *model.ListKVRequest) (int64,
*model.KVResponse, *errsvc.Error) {
+ opts := []service.FindOption{
+ service.WithKey(request.Key),
+ service.WithLabels(request.Labels),
+ service.WithOffset(request.Offset),
+ service.WithLimit(request.Limit),
+ }
+ m := request.Match
+ if m == common.PatternExact {
+ opts = append(opts, service.WithExactLabels())
+ }
+ if request.Status != "" {
+ opts = append(opts, service.WithStatus(request.Status))
+ }
+ rev, err := service.RevisionService.GetRevision(ctx, request.Domain)
+ if err != nil {
+ return rev, nil, config.NewError(config.ErrInternal,
err.Error())
+ }
+ kv, err := service.KVService.List(ctx, request.Domain, request.Project,
opts...)
+ if err != nil {
+ openlog.Error("common: " + err.Error())
+ return rev, nil, config.NewError(config.ErrInternal,
common.MsgDBError)
+ }
+ return rev, kv, nil
+}
+
+func Post(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, *errsvc.Error) {
+ if kv.Status == "" {
+ kv.Status = common.StatusDisabled
+ }
+ err := validator.Validate(kv)
+ if err != nil {
+ return nil, config.NewError(config.ErrInvalidParams,
err.Error())
+ }
+ err = quota.PreCreate("", kv.Domain, "", 1)
+ if err != nil {
+ if err == quota.ErrReached {
+ openlog.Error(fmt.Sprintf("can not create kv %s@%s, due
to quota violation", kv.Key, kv.Project))
+ return nil, config.NewError(config.ErrNotEnoughQuota,
err.Error())
+ }
+ openlog.Error(err.Error())
+ return nil, config.NewError(config.ErrInternal, "quota check
failed")
+ }
+ kv, err = service.KVService.Create(ctx, kv)
+ if err != nil {
+ openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
+ if err == session.ErrKVAlreadyExists {
+ return nil,
config.NewError(config.ErrRecordAlreadyExists, err.Error())
+ }
+ return nil, config.NewError(config.ErrInternal, "create kv
failed")
+ }
+ return kv, nil
+}
+
+func Upload(ctx context.Context, request *model.UploadKVRequest)
*model.DocRespOfUpload {
+ override := request.Override
+ kvs := request.KVs
+ result := &model.DocRespOfUpload{
+ Success: []*model.KVDoc{},
+ Failure: []*model.DocFailedOfUpload{},
+ }
+ strategy := SelectStrategy(override)
+ for i, kv := range kvs {
+ if kv == nil {
+ continue
+ }
+ kv.Domain = request.Domain
+ kv.Project = request.Project
+ kv, err := strategy.Execute(ctx, kv)
+ if err != nil {
+ if err.Code == config.ErrStopUpload {
+ appendAbortFailedKVResult(kvs[i:], result)
+ break
+ }
+ appendFailedKVResult(err, kv, result)
+ continue
+ }
+
+ Publish(kv)
+ result.Success = append(result.Success, kv)
+ }
+ return result
+}
+
+func appendFailedKVResult(err *errsvc.Error, kv *model.KVDoc, result
*model.DocRespOfUpload) {
+ failedKv := &model.DocFailedOfUpload{
+ Key: kv.Key,
+ Labels: kv.Labels,
+ ErrCode: err.Code,
+ ErrMsg: err.Detail,
+ }
+ result.Failure = append(result.Failure, failedKv)
+}
+
+func appendAbortFailedKVResult(kvs []*model.KVDoc, result
*model.DocRespOfUpload) {
+ for _, kv := range kvs {
+ failedKv := &model.DocFailedOfUpload{
+ Key: kv.Key,
+ Labels: kv.Labels,
+ ErrCode: config.ErrStopUpload,
+ ErrMsg: "stop overriding kvs after reaching the
duplicate kv",
+ }
+ result.Failure = append(result.Failure, failedKv)
+ }
+}
+
+func Publish(kv *model.KVDoc) {
+ err := pubsub.Publish(&pubsub.KVChangeEvent{
+ Key: kv.Key,
+ Labels: kv.Labels,
+ Project: kv.Project,
+ DomainID: kv.Domain,
+ Action: pubsub.ActionPut,
+ })
+ if err != nil {
+ openlog.Warn("lost kv change event when post:" + err.Error())
+ }
+ openlog.Info(fmt.Sprintf("post [%s] success", kv.ID))
+}
diff --git a/server/service/kv/override.go b/server/service/kv/override.go
new file mode 100644
index 0000000..9f4af1a
--- /dev/null
+++ b/server/service/kv/override.go
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kv
+
+import (
+ "context"
+ "github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/go-chassis/cari/pkg/errsvc"
+)
+
+var strategyMap = make(map[string]OverrideStrategy)
+
+type OverrideStrategy interface {
+ Execute(ctx context.Context, input *model.KVDoc) (*model.KVDoc,
*errsvc.Error)
+}
+
+func RegisterStrategy(override string, strategy OverrideStrategy) {
+ strategyMap[override] = strategy
+}
+
+func SelectStrategy(override string) OverrideStrategy {
+ return strategyMap[override]
+}
diff --git a/server/service/kv/override_abort.go
b/server/service/kv/override_abort.go
new file mode 100644
index 0000000..9b1b236
--- /dev/null
+++ b/server/service/kv/override_abort.go
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kv
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/go-chassis/cari/config"
+ "github.com/go-chassis/cari/pkg/errsvc"
+ "github.com/go-chassis/openlog"
+)
+
+func init() {
+ RegisterStrategy("abort", &Abort{})
+}
+
+type Abort struct {
+}
+
+func (a *Abort) Execute(ctx context.Context, kv *model.KVDoc) (*model.KVDoc,
*errsvc.Error) {
+ inputKV := kv
+ kv, err := Post(ctx, kv)
+ if err == nil {
+ return kv, nil
+ }
+ if err.Code == config.ErrRecordAlreadyExists {
+ openlog.Info(fmt.Sprintf("stop overriding duplicate [key: %s,
labels: %s]", inputKV.Key, inputKV.Labels))
+ return inputKV, config.NewError(config.ErrStopUpload, "stop
overriding duplicate kv")
+ }
+ return inputKV, err
+}
diff --git a/server/service/kv/override_force.go
b/server/service/kv/override_force.go
new file mode 100644
index 0000000..e188430
--- /dev/null
+++ b/server/service/kv/override_force.go
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kv
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/apache/servicecomb-kie/server/service"
+ "github.com/go-chassis/cari/config"
+ "github.com/go-chassis/cari/pkg/errsvc"
+ "github.com/go-chassis/openlog"
+)
+
+func init() {
+ RegisterStrategy("force", &Force{})
+}
+
+type Force struct {
+}
+
+func (f *Force) Execute(ctx context.Context, kv *model.KVDoc) (*model.KVDoc,
*errsvc.Error) {
+ input := kv
+ kv, err := Post(ctx, kv)
+ if err == nil {
+ return kv, nil
+ }
+ if err.Code != config.ErrRecordAlreadyExists {
+ return input, err
+ }
+
+ request := &model.ListKVRequest{
+ Project: input.Project,
+ Domain: input.Domain,
+ Key: input.Key,
+ Labels: input.Labels,
+ }
+ _, getKvsByOpts, getKvErr := ListKV(ctx, request)
+ if getKvErr != nil {
+ openlog.Info(fmt.Sprintf("get record [key: %s, labels: %s]
failed", input.Key, input.Labels))
+ return input, getKvErr
+ }
+ kvReq := &model.UpdateKVRequest{
+ ID: getKvsByOpts.Data[0].ID,
+ Value: input.Value,
+ Status: input.Status,
+ Project: input.Project,
+ Domain: input.Domain,
+ }
+ kv, updateErr := service.KVService.Update(ctx, kvReq)
+ if updateErr != nil {
+ openlog.Error(fmt.Sprintf("update record [key: %s, labels: %s]
failed", input.Key, input.Labels))
+ return input, config.NewError(config.ErrInternal,
updateErr.Error())
+ }
+ return kv, nil
+}
diff --git a/server/service/kv/override_skip.go
b/server/service/kv/override_skip.go
new file mode 100644
index 0000000..7031e0a
--- /dev/null
+++ b/server/service/kv/override_skip.go
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kv
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/go-chassis/cari/config"
+ "github.com/go-chassis/cari/pkg/errsvc"
+ "github.com/go-chassis/openlog"
+)
+
+func init() {
+ RegisterStrategy("skip", &Skip{})
+}
+
+type Skip struct {
+}
+
+func (s *Skip) Execute(ctx context.Context, kv *model.KVDoc) (*model.KVDoc,
*errsvc.Error) {
+ inputKV := kv
+ kv, err := Post(ctx, kv)
+ if err == nil {
+ return kv, nil
+ }
+ if err.Code == config.ErrRecordAlreadyExists {
+ openlog.Info(fmt.Sprintf("skip overriding duplicate [key: %s,
labels: %s]", inputKV.Key, inputKV.Labels))
+ return inputKV, config.NewError(config.ErrSkipDuplicateKV,
"skip overriding duplicate kvs")
+ }
+ return inputKV, err
+}
diff --git a/server/service/mongo/counter/revision.go
b/server/service/mongo/counter/revision.go
index 67fffc6..7b638cf 100644
--- a/server/service/mongo/counter/revision.go
+++ b/server/service/mongo/counter/revision.go
@@ -63,8 +63,8 @@ func ApplyRevision(ctx context.Context, domain string)
(int64, error) {
filter := bson.M{"name": revision, "domain": domain}
sr := collection.FindOneAndUpdate(ctx, filter,
bson.D{
- {"$inc", bson.D{
- {"count", 1},
+ {Key: "$inc", Value: bson.D{
+ {Key: "count", Value: 1},
}}},
options.FindOneAndUpdate().SetReturnDocument(options.After))
if sr.Err() != nil {
return 0, sr.Err()
diff --git a/server/service/mongo/history/dao.go
b/server/service/mongo/history/dao.go
index a215d08..58e723a 100644
--- a/server/service/mongo/history/dao.go
+++ b/server/service/mongo/history/dao.go
@@ -99,10 +99,14 @@ func AddHistory(ctx context.Context, kv *model.KVDoc) error
{
func AddDeleteTime(ctx context.Context, kvIDs []string, project, domain
string) error {
collection := session.GetDB().Collection(session.CollectionKVRevision)
now := time.Now()
- filter := bson.D{{"id", bson.M{"$in": kvIDs}}, {"project", project},
{"domain", domain}}
+ filter := bson.D{
+ {Key: "id", Value: bson.M{"$in": kvIDs}},
+ {Key: "project", Value: project},
+ {Key: "domain", Value: domain},
+ }
_, err := collection.UpdateMany(ctx, filter, bson.D{
- {"$set", bson.D{
- {"delete_time", now},
+ {Key: "$set", Value: bson.D{
+ {Key: "delete_time", Value: now},
}},
})
if err != nil {
diff --git a/server/service/mongo/kv/kv_dao.go
b/server/service/mongo/kv/kv_dao.go
index 378e559..38ab84f 100644
--- a/server/service/mongo/kv/kv_dao.go
+++ b/server/service/mongo/kv/kv_dao.go
@@ -83,12 +83,12 @@ func updateKeyValue(ctx context.Context, kv *model.KVDoc)
error {
}
collection := session.GetDB().Collection(session.CollectionKV)
ur, err := collection.UpdateOne(ctx, bson.M{"key": kv.Key,
"label_format": kv.LabelFormat}, bson.D{
- {"$set", bson.D{
- {"value", kv.Value},
- {"status", kv.Status},
- {"checker", kv.Checker},
- {"update_time", kv.UpdateTime},
- {"update_revision", kv.UpdateRevision},
+ {Key: "$set", Value: bson.D{
+ {Key: "value", Value: kv.Value},
+ {Key: "status", Value: kv.Status},
+ {Key: "checker", Value: kv.Checker},
+ {Key: "update_time", Value: kv.UpdateTime},
+ {Key: "update_revision", Value: kv.UpdateRevision},
}},
})
if err != nil {
@@ -119,7 +119,9 @@ func getValue(str string) string {
func findKV(ctx context.Context, domain string, project string, opts
service.FindOptions) (*mongo.Cursor, int, error) {
collection := session.GetDB().Collection(session.CollectionKV)
- ctx, _ = context.WithTimeout(ctx, opts.Timeout)
+ ctx, cancel := context.WithTimeout(ctx, opts.Timeout)
+ defer cancel()
+
filter := bson.M{"domain": domain, "project": project}
if opts.Key != "" {
filter["key"] = opts.Key
@@ -216,7 +218,10 @@ func findOneKVAndDelete(ctx context.Context, kvID,
project, domain string) (*mod
//findKVsAndDelete deletes multiple kvs and return the deleted kv list as
these appeared before deletion
func findKVsAndDelete(ctx context.Context, kvIDs []string, project, domain
string) ([]*model.KVDoc, error) {
- filter := bson.D{{"id", bson.M{"$in": kvIDs}}, {"project", project},
{"domain", domain}}
+ filter := bson.D{
+ {Key: "id", Value: bson.M{"$in": kvIDs}},
+ {Key: "project", Value: project},
+ {Key: "domain", Value: domain}}
kvs, err := findKeys(ctx, filter, false)
if err != nil {
if err != service.ErrKeyNotExists {
diff --git a/server/service/mongo/kv/kv_service.go
b/server/service/mongo/kv/kv_service.go
index c1d0cae..eb085fc 100644
--- a/server/service/mongo/kv/kv_service.go
+++ b/server/service/mongo/kv/kv_service.go
@@ -19,10 +19,8 @@ package kv
import (
"context"
- "github.com/apache/servicecomb-kie/pkg/stringutil"
- "time"
-
"github.com/apache/servicecomb-kie/pkg/model"
+ "github.com/apache/servicecomb-kie/pkg/stringutil"
"github.com/apache/servicecomb-kie/pkg/util"
"github.com/apache/servicecomb-kie/server/service"
"github.com/apache/servicecomb-kie/server/service/mongo/session"
@@ -39,12 +37,13 @@ const (
//Service operate data in mongodb
type Service struct {
- timeout time.Duration
}
//Create will create a key value record
func (s *Service) Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc,
error) {
- ctx, _ = context.WithTimeout(ctx, session.Timeout)
+ ctx, cancel := context.WithTimeout(ctx, session.Timeout)
+ defer cancel()
+
if kv.Labels == nil {
kv.Labels = map[string]string{}
}
@@ -72,7 +71,9 @@ func (s *Service) Create(ctx context.Context, kv
*model.KVDoc) (*model.KVDoc, er
//Update will update a key value record
func (s *Service) Update(ctx context.Context, kv *model.UpdateKVRequest)
(*model.KVDoc, error) {
- ctx, _ = context.WithTimeout(ctx, session.Timeout)
+ ctx, cancel := context.WithTimeout(ctx, session.Timeout)
+ defer cancel()
+
getRequest := &model.GetKVRequest{
Domain: kv.Domain,
Project: kv.Project,
@@ -99,7 +100,9 @@ func (s *Service) Update(ctx context.Context, kv
*model.UpdateKVRequest) (*model
//Exist supports you query a key value by label map or labels id
func (s *Service) Exist(ctx context.Context, domain, key string, project
string, options ...service.FindOption) (*model.KVDoc, error) {
- ctx, _ = context.WithTimeout(context.Background(), session.Timeout)
+ ctx, cancel := context.WithTimeout(ctx, session.Timeout)
+ defer cancel()
+
opts := service.FindOptions{}
for _, o := range options {
o(&opts)
@@ -133,13 +136,15 @@ func (s *Service) Exist(ctx context.Context, domain, key
string, project string,
//FindOneAndDelete deletes one kv by id and return the deleted kv as these
appeared before deletion
//domain=tenant
func (s *Service) FindOneAndDelete(ctx context.Context, kvID string, domain
string, project string) (*model.KVDoc, error) {
- ctx, _ = context.WithTimeout(context.Background(), session.Timeout)
+ ctx, cancel := context.WithTimeout(ctx, session.Timeout)
+ defer cancel()
return findOneKVAndDelete(ctx, kvID, project, domain)
}
//FindManyAndDelete deletes multiple kvs and return the deleted kv list as
these appeared before deletion
func (s *Service) FindManyAndDelete(ctx context.Context, kvIDs []string,
domain string, project string) ([]*model.KVDoc, error) {
- ctx, _ = context.WithTimeout(context.Background(), session.Timeout)
+ ctx, cancel := context.WithTimeout(ctx, session.Timeout)
+ defer cancel()
return findKVsAndDelete(ctx, kvIDs, project, domain)
}
@@ -182,6 +187,7 @@ func (s *Service) Get(ctx context.Context, request
*model.GetKVRequest) (*model.
//Total return kv record number
func (s *Service) Total(ctx context.Context, domain string) (int64, error) {
- ctx, _ = context.WithTimeout(ctx, session.Timeout)
+ ctx, cancel := context.WithTimeout(ctx, session.Timeout)
+ defer cancel()
return total(ctx, domain)
}
diff --git a/server/service/mongo/session/session.go
b/server/service/mongo/session/session.go
index a5e5b0b..48b6b90 100644
--- a/server/service/mongo/session/session.go
+++ b/server/service/mongo/session/session.go
@@ -103,20 +103,20 @@ func Init() error {
once.Do(func() {
sc, _ :=
bsoncodec.NewStructCodec(bsoncodec.DefaultStructTagParser)
reg := bson.NewRegistryBuilder().
- RegisterEncoder(reflect.TypeOf(model.LabelDoc{}), sc).
- RegisterEncoder(reflect.TypeOf(model.KVDoc{}), sc).
+ RegisterTypeEncoder(reflect.TypeOf(model.LabelDoc{}),
sc).
+ RegisterTypeEncoder(reflect.TypeOf(model.KVDoc{}), sc).
Build()
uri := cipherutil.TryDecrypt(config.GetDB().URI)
clientOps :=
[]*options.ClientOptions{options.Client().ApplyURI(uri)}
if config.GetDB().SSLEnabled {
if config.GetDB().RootCA == "" {
- err = ErrRootCAMissing
+ openlog.Error(ErrRootCAMissing.Error())
return
}
pool := x509.NewCertPool()
caCert, err := ioutil.ReadFile(config.GetDB().RootCA)
if err != nil {
- err = fmt.Errorf("read ca cert file %s failed",
caCert)
+ openlog.Error(fmt.Sprintf("read ca cert file %s
failed", caCert))
return
}
pool.AppendCertsFromPEM(caCert)
@@ -133,7 +133,10 @@ func Init() error {
return
}
openlog.Info("DB connecting")
- ctx, _ := context.WithTimeout(context.Background(),
10*time.Second)
+
+ ctx, cancel := context.WithTimeout(context.Background(),
10*time.Second)
+ defer cancel()
+
err = client.Connect(ctx)
if err != nil {
return
@@ -157,9 +160,9 @@ func GetDB() *mongo.Database {
func CreateView(ctx context.Context, view, source string, pipeline
mongo.Pipeline) error {
sr := GetDB().RunCommand(ctx,
bson.D{
- {"create", view},
- {"viewOn", source},
- {"pipeline", pipeline},
+ {Key: "create", Value: view},
+ {Key: "viewOn", Value: source},
+ {Key: "pipeline", Value: pipeline},
})
if sr.Err() != nil {
openlog.Error("can not create view: " + sr.Err().Error())
@@ -186,18 +189,16 @@ func GetColInfo(ctx context.Context, name string)
(*CollectionInfo, error) {
return nil, ErrGetPipeline
}
defer cur.Close(ctx)
- for cur.Next(ctx) {
- openlog.Debug(cur.Current.String())
- c := &CollectionInfo{}
- err := cur.Decode(c)
- if err != nil {
- openlog.Error(err.Error())
- return nil, ErrGetPipeline
- }
- return c, nil
- break
+ if !cur.Next(ctx) {
+ return nil, ErrGetPipeline
+ }
+ openlog.Debug(cur.Current.String())
+ c := &CollectionInfo{}
+ if err := cur.Decode(c); err != nil {
+ openlog.Error(err.Error())
+ return nil, ErrGetPipeline
}
- return nil, ErrGetPipeline
+ return c, nil
}
//EnsureDB build mongo db schema
diff --git a/server/service/mongo/track/polling_detail_dao.go
b/server/service/mongo/track/polling_detail_dao.go
index 3380ebf..42bb812 100644
--- a/server/service/mongo/track/polling_detail_dao.go
+++ b/server/service/mongo/track/polling_detail_dao.go
@@ -45,7 +45,7 @@ func CreateOrUpdate(ctx context.Context, detail
*model.PollingDetail) (*model.Po
}
return nil, res.Err()
}
- _, err := collection.UpdateOne(ctx, queryFilter, bson.D{{"$set",
detail}})
+ _, err := collection.UpdateOne(ctx, queryFilter, bson.D{{Key: "$set",
Value: detail}})
if err != nil {
return nil, err
}
diff --git a/server/service/mongo/view/view_service.go
b/server/service/mongo/view/view_service.go
index 4c40350..049e7ff 100644
--- a/server/service/mongo/view/view_service.go
+++ b/server/service/mongo/view/view_service.go
@@ -27,12 +27,10 @@ import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
- "time"
)
//Service operate data in mongodb
type Service struct {
- timeout time.Duration
}
//Create insert a view data and create a mongo db view
@@ -42,8 +40,11 @@ func (s *Service) Create(ctx context.Context, viewDoc
*model.ViewDoc, options ..
}
var pipeline mongo.Pipeline = []bson.D{
{{
- "$match",
- bson.D{{"domain", viewDoc.Domain}, {"project",
viewDoc.Project}},
+ Key: "$match",
+ Value: bson.D{
+ {Key: "domain", Value: viewDoc.Domain},
+ {Key: "project", Value: viewDoc.Project},
+ },
}},
}
opts := service.FindOptions{}
@@ -51,11 +52,11 @@ func (s *Service) Create(ctx context.Context, viewDoc
*model.ViewDoc, options ..
o(&opts)
}
if opts.Key != "" {
- pipeline = append(pipeline, bson.D{{"$match", bson.D{{"key",
opts.Key}}}})
+ pipeline = append(pipeline, bson.D{{Key: "$match", Value:
bson.D{{Key: "key", Value: opts.Key}}}})
}
if len(opts.Labels) != 0 {
for k, v := range opts.Labels {
- pipeline = append(pipeline, bson.D{{"$match",
bson.D{{"labels." + k, v}}}})
+ pipeline = append(pipeline, bson.D{{Key: "$match",
Value: bson.D{{Key: "labels." + k, Value: v}}}})
}
}
viewDoc.ID = uuid.NewV4().String()
@@ -98,9 +99,9 @@ func (s *Service) Update(ctx context.Context, viewDoc
*model.ViewDoc) error {
"project": oldView.Project,
"id": oldView.ID},
bson.D{
- {"$set", bson.D{
- {"name", oldView.Display},
- {"criteria", oldView.Criteria},
+ {Key: "$set", Value: bson.D{
+ {Key: "name", Value: oldView.Display},
+ {Key: "criteria", Value: oldView.Criteria},
}},
})
if err != nil {