This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git


The following commit(s) were added to refs/heads/master by this push:
     new e72007d  add upload interface (#185)
e72007d is described below

commit e72007deef4cc926baebb0972f357be22af9b823
Author: Sphairis <[email protected]>
AuthorDate: Thu Jul 1 17:30:50 2021 +0800

    add upload interface (#185)
    
    * add upload interface
    
    * add upload response/interface/test
    
    * Fix error
    
    * add override strategy
    
    * split logic and add override logic
    
    * key
    
    * fix test error
    
    * modify the view
    
    * fix error
    
    * change request body
    
    * fix error
    
    * rebuild code
    
    * fix error
    
    * fix error
    
    * change location
    
    * fix error
    
    * change param location
    
    * fix static check
    
    * Refactor strategy
    
    * fix static check
    
    * delete folder
    
    Co-authored-by: SphaIris <aaalixiaopei123>
    Co-authored-by: little-cui <[email protected]>
---
 .github/workflows/golangci-lint.yml              |   2 +-
 go.mod                                           |   2 +-
 go.sum                                           |   2 +
 pkg/common/common.go                             |   1 +
 pkg/model/db_schema.go                           |   9 ++
 pkg/model/kv.go                                  |  14 ++
 server/resource/v1/common.go                     |  34 +----
 server/resource/v1/doc_struct.go                 |  16 ++
 server/resource/v1/kv_resource.go                |  96 ++++++------
 server/resource/v1/kv_resource_test.go           | 183 +++++++++++++++++++++++
 server/service/kv/kv.go                          | 153 +++++++++++++++++++
 server/service/kv/override.go                    |  38 +++++
 server/service/kv/override_abort.go              |  47 ++++++
 server/service/kv/override_force.go              |  71 +++++++++
 server/service/kv/override_skip.go               |  47 ++++++
 server/service/mongo/counter/revision.go         |   4 +-
 server/service/mongo/history/dao.go              |  10 +-
 server/service/mongo/kv/kv_dao.go                |  21 ++-
 server/service/mongo/kv/kv_service.go            |  26 ++--
 server/service/mongo/session/session.go          |  39 ++---
 server/service/mongo/track/polling_detail_dao.go |   2 +-
 server/service/mongo/view/view_service.go        |  19 +--
 22 files changed, 708 insertions(+), 128 deletions(-)

diff --git a/.github/workflows/golangci-lint.yml 
b/.github/workflows/golangci-lint.yml
index 9640ae5..508e33e 100644
--- a/.github/workflows/golangci-lint.yml
+++ b/.github/workflows/golangci-lint.yml
@@ -16,4 +16,4 @@ jobs:
         uses: golangci/golangci-lint-action@v2
         with:
           version: v1.29
-          args: --skip-dirs=examples --out-format=colored-line-number 
--skip-files=.*_test.go$
\ No newline at end of file
+          args: --skip-dirs=examples,test --out-format=colored-line-number 
--skip-files=.*_test.go$
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 539d13b..4dfdd8a 100644
--- a/go.mod
+++ b/go.mod
@@ -2,7 +2,7 @@ module github.com/apache/servicecomb-kie
 
 require (
        github.com/emicklei/go-restful v2.12.0+incompatible
-       github.com/go-chassis/cari v0.4.1-0.20210528013912-6da8395f7ff9
+       github.com/go-chassis/cari v0.4.1-0.20210619062801-7681f1cfc0e5
        github.com/go-chassis/foundation v0.3.1-0.20210602072914-a580bed505d0
        github.com/go-chassis/go-archaius v1.5.2-0.20210301074935-e4694f6b077b
        github.com/go-chassis/go-chassis/v2 v2.1.2-0.20210308033545-985e98e20637
diff --git a/go.sum b/go.sum
index f0e08bd..6682d6b 100644
--- a/go.sum
+++ b/go.sum
@@ -124,6 +124,8 @@ github.com/go-chassis/cari v0.4.0 
h1:2rJ7pB4dfZIu5/HQwwJhmybC1n/0MXWkKieoHCu4tQc
 github.com/go-chassis/cari v0.4.0/go.mod 
h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8=
 github.com/go-chassis/cari v0.4.1-0.20210528013912-6da8395f7ff9 
h1:UxNOY1mnK7i9qRYeu0d2jsVItfoV0ga75RF6isOhn00=
 github.com/go-chassis/cari v0.4.1-0.20210528013912-6da8395f7ff9/go.mod 
h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8=
+github.com/go-chassis/cari v0.4.1-0.20210619062801-7681f1cfc0e5 
h1:KwRfC/uEqbi//YlNiuUhhf0ADkE3zf6uc43LWXsITRo=
+github.com/go-chassis/cari v0.4.1-0.20210619062801-7681f1cfc0e5/go.mod 
h1:av/19fqwEP4eOC8unL/z67AAbFDwXUCko6SKa4Avrd8=
 github.com/go-chassis/foundation v0.2.2-0.20201210043510-9f6d3de40234/go.mod 
h1:2PjwqpVwYEVaAldl5A58a08viH8p27pNeYaiE3ZxOBA=
 github.com/go-chassis/foundation v0.2.2/go.mod 
h1:2PjwqpVwYEVaAldl5A58a08viH8p27pNeYaiE3ZxOBA=
 github.com/go-chassis/foundation v0.3.0 
h1:jG4BIrK8fXD9jbTtJ5rOLGQZ1pQI/mLnDuVJzToCtos=
diff --git a/pkg/common/common.go b/pkg/common/common.go
index 2ed8a39..205f2ef 100644
--- a/pkg/common/common.go
+++ b/pkg/common/common.go
@@ -38,6 +38,7 @@ const (
        QueryParamIP           = "ip"
        QueryParamURLPath      = "urlPath"
        QueryParamUserAgent    = "userAgent"
+       QueryParamOverride     = "override"
 )
 
 //http headers
diff --git a/pkg/model/db_schema.go b/pkg/model/db_schema.go
index 4ebc5b1..43a5873 100644
--- a/pkg/model/db_schema.go
+++ b/pkg/model/db_schema.go
@@ -98,4 +98,13 @@ type ListKVRequest struct {
        Offset  int64             `validate:"min=0"`
        Limit   int64             `validate:"min=0,max=100"`
        Status  string            `json:"status,omitempty" 
yaml:"status,omitempty" validate:"kvStatus"`
+       Match   string            `json:"match,omitempty" 
yaml:"match,omitempty"`
+}
+
+// UploadKVRequest contains kv list upload request params
+type UploadKVRequest struct {
+       Domain   string `json:"domain,omitempty" yaml:"domain,omitempty" 
validate:"min=1,max=256,commonName"` //redundant
+       Project  string `json:"project,omitempty" yaml:"project,omitempty" 
validate:"min=1,max=256,commonName"`
+       KVs      []*KVDoc
+       Override string
 }
diff --git a/pkg/model/kv.go b/pkg/model/kv.go
index 65545eb..c9c7a20 100644
--- a/pkg/model/kv.go
+++ b/pkg/model/kv.go
@@ -71,6 +71,20 @@ type DocResponseGetKey struct {
        Total int64                   `json:"total"`
 }
 
+//DocRespOfUpload is response doc
+type DocRespOfUpload struct {
+       Success []*KVDoc             `json:"success"`
+       Failure []*DocFailedOfUpload `json:"failure"`
+}
+
+//DocFailedOfUpload is reponse doc
+type DocFailedOfUpload struct {
+       Key     string            `json:"key"`
+       Labels  map[string]string `json:"labels"`
+       ErrCode int32             `json:"error_code"`
+       ErrMsg  string            `json:"error_message"`
+}
+
 //PollingDataResponse  is response doc
 type PollingDataResponse struct {
        Data  []*PollingDetail `json:"data"`
diff --git a/server/resource/v1/common.go b/server/resource/v1/common.go
index 70f6010..5383928 100644
--- a/server/resource/v1/common.go
+++ b/server/resource/v1/common.go
@@ -21,9 +21,7 @@ import (
        "context"
        "encoding/json"
        "errors"
-       "github.com/apache/servicecomb-kie/pkg/model"
-       "github.com/apache/servicecomb-kie/server/service/mongo/session"
-       "github.com/go-chassis/cari/config"
+       kvsvc "github.com/apache/servicecomb-kie/server/service/kv"
        "github.com/go-chassis/cari/rbac"
        "net/http"
        "strconv"
@@ -31,9 +29,12 @@ import (
        "time"
 
        "github.com/apache/servicecomb-kie/pkg/common"
+       "github.com/apache/servicecomb-kie/pkg/model"
        "github.com/apache/servicecomb-kie/server/pubsub"
        "github.com/apache/servicecomb-kie/server/service"
+       "github.com/apache/servicecomb-kie/server/service/mongo/session"
        goRestful "github.com/emicklei/go-restful"
+       "github.com/go-chassis/cari/config"
        "github.com/go-chassis/go-chassis/v2/server/restful"
        "github.com/go-chassis/openlog"
        uuid "github.com/satori/go.uuid"
@@ -248,32 +249,13 @@ func checkDomainAndProject(domain, project string) error {
 }
 
 func queryAndResponse(rctx *restful.Context, request *model.ListKVRequest) {
-       m := getMatchPattern(rctx)
-       opts := []service.FindOption{
-               service.WithKey(request.Key),
-               service.WithLabels(request.Labels),
-               service.WithOffset(request.Offset),
-               service.WithLimit(request.Limit),
-       }
-       if m == common.PatternExact {
-               opts = append(opts, service.WithExactLabels())
-       }
-       if request.Status != "" {
-               opts = append(opts, service.WithStatus(request.Status))
-       }
-       rev, err := service.RevisionService.GetRevision(rctx.Ctx, 
request.Domain)
-       if err != nil {
-               WriteErrResponse(rctx, config.ErrInternal, err.Error())
-               return
-       }
-       kv, err := service.KVService.List(rctx.Ctx, request.Domain, 
request.Project, opts...)
-       if err != nil {
-               openlog.Error("common: " + err.Error())
-               WriteErrResponse(rctx, config.ErrInternal, common.MsgDBError)
+       rev, kv, queryErr := kvsvc.ListKV(rctx.Ctx, request)
+       if queryErr != nil {
+               WriteErrResponse(rctx, queryErr.Code, queryErr.Message)
                return
        }
        rctx.ReadResponseWriter().Header().Set(common.HeaderRevision, 
strconv.FormatInt(rev, 10))
-       err = writeResponse(rctx, kv)
+       err := writeResponse(rctx, kv)
        rctx.ReadRestfulRequest().SetAttribute(common.RespBodyContextKey, 
kv.Data)
        if err != nil {
                openlog.Error(err.Error())
diff --git a/server/resource/v1/doc_struct.go b/server/resource/v1/doc_struct.go
index 045d06d..388bb0d 100644
--- a/server/resource/v1/doc_struct.go
+++ b/server/resource/v1/doc_struct.go
@@ -19,6 +19,7 @@ package v1
 
 import (
        "github.com/apache/servicecomb-kie/pkg/common"
+       "github.com/apache/servicecomb-kie/pkg/model"
 
        goRestful "github.com/emicklei/go-restful"
        "github.com/go-chassis/go-chassis/v2/server/restful"
@@ -170,6 +171,21 @@ type KVCreateBody struct {
        ValueType string            `json:"value_type"`
 }
 
+//KVUploadBody is open api doc
+type KVUploadBody struct {
+       MetaData MetaData       `json:"metadata"`
+       Data     []*model.KVDoc `json:"data"`
+}
+
+//MetaData is extra info
+type MetaData struct {
+       Version     string      `json:"version"`
+       Annotations Annotations `json:"annotations"`
+}
+
+type Annotations struct {
+}
+
 //KVUpdateBody is open api doc
 type KVUpdateBody struct {
        Status string `json:"status"`
diff --git a/server/resource/v1/kv_resource.go 
b/server/resource/v1/kv_resource.go
index 52d9a97..a2c863d 100644
--- a/server/resource/v1/kv_resource.go
+++ b/server/resource/v1/kv_resource.go
@@ -21,17 +21,16 @@ package v1
 import (
        "encoding/json"
        "fmt"
+       kvsvc "github.com/apache/servicecomb-kie/server/service/kv"
        "net/http"
 
        "github.com/apache/servicecomb-kie/pkg/common"
        "github.com/apache/servicecomb-kie/pkg/model"
        "github.com/apache/servicecomb-kie/server/pubsub"
        "github.com/apache/servicecomb-kie/server/service"
-       "github.com/apache/servicecomb-kie/server/service/mongo/session"
        goRestful "github.com/emicklei/go-restful"
        "github.com/go-chassis/cari/config"
        "github.com/go-chassis/foundation/validator"
-       "github.com/go-chassis/go-chassis/v2/pkg/backends/quota"
        "github.com/go-chassis/go-chassis/v2/server/restful"
        "github.com/go-chassis/openlog"
 )
@@ -40,64 +39,46 @@ import (
 type KVResource struct {
 }
 
-//Post create a kv
-func (r *KVResource) Post(rctx *restful.Context) {
+//Upload upload kvs
+func (r *KVResource) Upload(rctx *restful.Context) {
        var err error
-       project := rctx.ReadPathParameter(common.PathParameterProject)
-       kv := new(model.KVDoc)
-       if err = readRequest(rctx, kv); err != nil {
+       inputUpload := new(KVUploadBody)
+       if err = readRequest(rctx, &inputUpload); err != nil {
                WriteErrResponse(rctx, config.ErrInvalidParams, 
fmt.Sprintf(FmtReadRequestError, err))
                return
        }
-       domain := ReadDomain(rctx.Ctx)
-       kv.Domain = domain
-       kv.Project = project
-       if kv.Status == "" {
-               kv.Status = common.StatusDisabled
-       }
-       err = validator.Validate(kv)
-       if err != nil {
-               WriteErrResponse(rctx, config.ErrInvalidParams, err.Error())
-               return
-       }
-       err = quota.PreCreate("", kv.Domain, "", 1)
+       result := kvsvc.Upload(rctx.Ctx, &model.UploadKVRequest{
+               Domain:   ReadDomain(rctx.Ctx),
+               Project:  rctx.ReadPathParameter(common.PathParameterProject),
+               KVs:      inputUpload.Data,
+               Override: rctx.ReadQueryParameter(common.QueryParamOverride),
+       })
+       err = writeResponse(rctx, result)
        if err != nil {
-               if err == quota.ErrReached {
-                       openlog.Info(fmt.Sprintf("can not create kv %s@%s, due 
to quota violation", kv.Key, kv.Project))
-                       WriteErrResponse(rctx, config.ErrNotEnoughQuota, 
err.Error())
-                       return
-               }
                openlog.Error(err.Error())
-               WriteErrResponse(rctx, config.ErrInternal, "quota check failed")
-               return
        }
-       kv, err = service.KVService.Create(rctx.Ctx, kv)
-       if err != nil {
-               openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
-               if err == session.ErrKVAlreadyExists {
-                       WriteErrResponse(rctx, config.ErrRecordAlreadyExists, 
err.Error())
-                       return
-               }
-               WriteErrResponse(rctx, config.ErrInternal, "create kv failed")
+}
+
+//Post create a kv
+func (r *KVResource) Post(rctx *restful.Context) {
+       var err error
+       kv := new(model.KVDoc)
+       if err = readRequest(rctx, kv); err != nil {
+               WriteErrResponse(rctx, config.ErrInvalidParams, 
fmt.Sprintf(FmtReadRequestError, err))
                return
        }
-       err = pubsub.Publish(&pubsub.KVChangeEvent{
-               Key:      kv.Key,
-               Labels:   kv.Labels,
-               Project:  project,
-               DomainID: kv.Domain,
-               Action:   pubsub.ActionPut,
-       })
-       if err != nil {
-               openlog.Warn("lost kv change event when post:" + err.Error())
+       kv.Domain = ReadDomain(rctx.Ctx)
+       kv.Project = rctx.ReadPathParameter(common.PathParameterProject)
+       kv, postErr := kvsvc.Post(rctx.Ctx, kv)
+       if postErr != nil {
+               WriteErrResponse(rctx, postErr.Code, postErr.Message)
+               return
        }
-       openlog.Info(
-               fmt.Sprintf("post [%s] success", kv.ID))
+       kvsvc.Publish(kv)
        err = writeResponse(rctx, kv)
        if err != nil {
                openlog.Error(err.Error())
        }
-
 }
 
 //Put update a kv
@@ -182,6 +163,7 @@ func (r *KVResource) List(rctx *restful.Context) {
                Domain:  ReadDomain(rctx.Ctx),
                Key:     rctx.ReadQueryParameter(common.QueryParamKey),
                Status:  rctx.ReadQueryParameter(common.QueryParamStatus),
+               Match:   getMatchPattern(rctx),
        }
        labels, err := getLabels(rctx)
        if err != nil {
@@ -217,7 +199,7 @@ func returnData(rctx *restful.Context, request 
*model.ListKVRequest) {
                changed, err := eventHappened(rctx, wait, &pubsub.Topic{
                        Labels:    request.Labels,
                        Project:   request.Project,
-                       MatchType: getMatchPattern(rctx),
+                       MatchType: request.Match,
                        DomainID:  request.Domain,
                })
                if err != nil {
@@ -246,7 +228,7 @@ func returnData(rctx *restful.Context, request 
*model.ListKVRequest) {
                        changed, err := eventHappened(rctx, wait, &pubsub.Topic{
                                Labels:    request.Labels,
                                Project:   request.Project,
-                               MatchType: getMatchPattern(rctx),
+                               MatchType: request.Match,
                                DomainID:  request.Domain,
                        })
                        if err != nil {
@@ -348,6 +330,24 @@ func (r *KVResource) URLPatterns() []restful.Route {
        return []restful.Route{
                {
                        Method:       http.MethodPost,
+                       Path:         "/v1/{project}/kie/file",
+                       ResourceFunc: r.Upload,
+                       FuncDesc:     "upload key values",
+                       Parameters: []*restful.Parameters{
+                               DocPathProject,
+                               DocHeaderContentTypeJSONAndYaml,
+                       },
+                       Read: KVUploadBody{},
+                       Returns: []*restful.Returns{
+                               {
+                                       Code:  http.StatusOK,
+                                       Model: model.DocRespOfUpload{},
+                               },
+                       },
+                       Consumes: []string{goRestful.MIME_JSON, 
common.ContentTypeYaml},
+                       Produces: []string{goRestful.MIME_JSON, 
common.ContentTypeYaml},
+               }, {
+                       Method:       http.MethodPost,
                        Path:         "/v1/{project}/kie/kv",
                        ResourceFunc: r.Post,
                        FuncDesc:     "create a key value",
diff --git a/server/resource/v1/kv_resource_test.go 
b/server/resource/v1/kv_resource_test.go
index 695ece2..1d7c1b3 100644
--- a/server/resource/v1/kv_resource_test.go
+++ b/server/resource/v1/kv_resource_test.go
@@ -367,6 +367,189 @@ func TestKVResource_List(t *testing.T) {
 
        })
 }
+func TestKVResource_Upload(t *testing.T) {
+       t.Run("test force with the same key and the same labels, and one 
invalid input, should return 2 success and 1 failure", func(t *testing.T) {
+               input := new(v1.KVUploadBody)
+               input.Data = []*model.KVDoc{
+                       {
+                               Key:    "1",
+                               Value:  "1",
+                               Labels: map[string]string{"2": "2"},
+                       },
+                       {
+                               Key:    "1",
+                               Value:  "1",
+                               Status: "invalid",
+                               Labels: map[string]string{"1": "1"},
+                       },
+                       {
+                               Key:    "1",
+                               Value:  "1-update",
+                               Labels: map[string]string{"2": "2"},
+                       },
+               }
+               j, _ := json.Marshal(input)
+               r, _ := http.NewRequest("POST", 
"/v1/kv_test/kie/file?override=force", bytes.NewBuffer(j))
+               r.Header.Set("Content-Type", "application/json")
+               kvr := &v1.KVResource{}
+               c, _ := restfultest.New(kvr, nil)
+               resp := httptest.NewRecorder()
+               c.ServeHTTP(resp, r)
+
+               body, err := ioutil.ReadAll(resp.Body)
+               assert.NoError(t, err)
+               data := &model.DocRespOfUpload{
+                       Success: []*model.KVDoc{},
+                       Failure: []*model.DocFailedOfUpload{},
+               }
+               err = json.Unmarshal(body, data)
+               assert.NoError(t, err)
+               assert.Equal(t, http.StatusOK, resp.Code)
+               assert.Equal(t, 1, len(data.Failure))
+               assert.Equal(t, 2, len(data.Success))
+               assert.Equal(t, data.Success[0].ID, data.Success[1].ID)
+               assert.Equal(t, "1-update", data.Success[1].Value)
+       })
+       t.Run("test force with the same key and not the same labels and ont 
invalid input, should return 2 success and 1 failure", func(t *testing.T) {
+               input := new(v1.KVUploadBody)
+               input.Data = []*model.KVDoc{
+                       {
+                               Key:    "2",
+                               Value:  "2",
+                               Labels: map[string]string{"1": "1"},
+                       },
+                       {
+                               Key:    "2",
+                               Value:  "2",
+                               Status: "invalid",
+                               Labels: map[string]string{"1": "1"},
+                       },
+
+                       {
+                               Key:    "2",
+                               Value:  "2",
+                               Labels: map[string]string{"2": "2"},
+                       },
+               }
+               j, _ := json.Marshal(input)
+               r, _ := http.NewRequest("POST", 
"/v1/kv_test/kie/file?override=force", bytes.NewBuffer(j))
+               r.Header.Set("Content-Type", "application/json")
+               kvr := &v1.KVResource{}
+               c, _ := restfultest.New(kvr, nil)
+               resp := httptest.NewRecorder()
+               c.ServeHTTP(resp, r)
+
+               body, err := ioutil.ReadAll(resp.Body)
+               assert.NoError(t, err)
+               data := &model.DocRespOfUpload{
+                       Success: []*model.KVDoc{},
+                       Failure: []*model.DocFailedOfUpload{},
+               }
+               err = json.Unmarshal(body, data)
+               assert.NoError(t, err)
+               assert.Equal(t, http.StatusOK, resp.Code)
+               assert.Equal(t, 1, len(data.Failure))
+               assert.Equal(t, 2, len(data.Success))
+               assert.NotEqual(t, data.Success[0].ID, data.Success[1].ID)
+       })
+       t.Run("test skip, with one invalid input, should return 2 success and 2 
failure", func(t *testing.T) {
+               input := new(v1.KVUploadBody)
+               input.Data = []*model.KVDoc{
+                       {
+                               Key:    "3",
+                               Value:  "1",
+                               Labels: map[string]string{"2": "2"},
+                       },
+                       {
+                               Key:    "2",
+                               Value:  "2",
+                               Status: "invalid",
+                               Labels: map[string]string{"1": "1"},
+                       },
+                       {
+                               Key:    "3",
+                               Value:  "1-update",
+                               Labels: map[string]string{"2": "2"},
+                       },
+                       {
+                               Key:       "4",
+                               Value:     "1",
+                               Labels:    map[string]string{"2": "2"},
+                               ValueType: "text",
+                               Status:    "enabled",
+                       },
+               }
+               j, _ := json.Marshal(input)
+               r, _ := http.NewRequest("POST", 
"/v1/kv_test/kie/file?override=skip", bytes.NewBuffer(j))
+               r.Header.Set("Content-Type", "application/json")
+               kvr := &v1.KVResource{}
+               c, _ := restfultest.New(kvr, nil)
+               resp := httptest.NewRecorder()
+               c.ServeHTTP(resp, r)
+
+               body, err := ioutil.ReadAll(resp.Body)
+               assert.NoError(t, err)
+               data := &model.DocRespOfUpload{
+                       Success: []*model.KVDoc{},
+                       Failure: []*model.DocFailedOfUpload{},
+               }
+               err = json.Unmarshal(body, data)
+               assert.NoError(t, err)
+               assert.Equal(t, http.StatusOK, resp.Code)
+               assert.Equal(t, 2, len(data.Failure))
+               assert.Equal(t, 2, len(data.Success))
+               assert.Equal(t, "1", data.Success[0].Value)
+               assert.Equal(t, "1", data.Success[1].Value)
+               assert.Equal(t, "validate failed, field: KVDoc.Status, rule: 
^$|^(enabled|disabled)$", data.Failure[0].ErrMsg)
+               assert.Equal(t, "skip overriding duplicate kvs", 
data.Failure[1].ErrMsg)
+       })
+       t.Run("test abort, with one invalid input, should return 1 success and 
3 failure", func(t *testing.T) {
+               input := new(v1.KVUploadBody)
+               input.Data = []*model.KVDoc{
+                       {
+                               Key:    "5",
+                               Value:  "2",
+                               Labels: map[string]string{"1": "1"},
+                       },
+                       {
+                               Key:    "5",
+                               Value:  "2-update",
+                               Labels: map[string]string{"1": "1"},
+                       },
+                       {
+                               Key:    "5",
+                               Value:  "2-update",
+                               Status: "invalid",
+                               Labels: map[string]string{"1": "1"},
+                       },
+                       {
+                               Key:    "6",
+                               Value:  "2",
+                               Labels: map[string]string{"4": "4"},
+                       },
+               }
+               j, _ := json.Marshal(input)
+               r, _ := http.NewRequest("POST", 
"/v1/kv_test/kie/file?override=abort", bytes.NewBuffer(j))
+               r.Header.Set("Content-Type", "application/json")
+               kvr := &v1.KVResource{}
+               c, _ := restfultest.New(kvr, nil)
+               resp := httptest.NewRecorder()
+               c.ServeHTTP(resp, r)
+
+               body, err := ioutil.ReadAll(resp.Body)
+               assert.NoError(t, err)
+               data := &model.DocRespOfUpload{
+                       Success: []*model.KVDoc{},
+                       Failure: []*model.DocFailedOfUpload{},
+               }
+               err = json.Unmarshal(body, data)
+               assert.NoError(t, err)
+               assert.Equal(t, http.StatusOK, resp.Code)
+               assert.Equal(t, 3, len(data.Failure))
+               assert.Equal(t, 1, len(data.Success))
+               assert.Equal(t, "2", data.Success[0].Value)
+       })
+}
 func TestKVResource_PutAndGet(t *testing.T) {
        var id string
        kv := &model.KVDoc{
diff --git a/server/service/kv/kv.go b/server/service/kv/kv.go
new file mode 100644
index 0000000..2c80e6e
--- /dev/null
+++ b/server/service/kv/kv.go
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kv
+
+import (
+       "context"
+       "fmt"
+       "github.com/apache/servicecomb-kie/pkg/common"
+       "github.com/apache/servicecomb-kie/pkg/model"
+       "github.com/apache/servicecomb-kie/server/pubsub"
+       "github.com/apache/servicecomb-kie/server/service"
+       "github.com/apache/servicecomb-kie/server/service/mongo/session"
+       "github.com/go-chassis/cari/config"
+       "github.com/go-chassis/cari/pkg/errsvc"
+       "github.com/go-chassis/foundation/validator"
+       "github.com/go-chassis/go-chassis/v2/pkg/backends/quota"
+       "github.com/go-chassis/openlog"
+)
+
+func ListKV(ctx context.Context, request *model.ListKVRequest) (int64, 
*model.KVResponse, *errsvc.Error) {
+       opts := []service.FindOption{
+               service.WithKey(request.Key),
+               service.WithLabels(request.Labels),
+               service.WithOffset(request.Offset),
+               service.WithLimit(request.Limit),
+       }
+       m := request.Match
+       if m == common.PatternExact {
+               opts = append(opts, service.WithExactLabels())
+       }
+       if request.Status != "" {
+               opts = append(opts, service.WithStatus(request.Status))
+       }
+       rev, err := service.RevisionService.GetRevision(ctx, request.Domain)
+       if err != nil {
+               return rev, nil, config.NewError(config.ErrInternal, 
err.Error())
+       }
+       kv, err := service.KVService.List(ctx, request.Domain, request.Project, 
opts...)
+       if err != nil {
+               openlog.Error("common: " + err.Error())
+               return rev, nil, config.NewError(config.ErrInternal, 
common.MsgDBError)
+       }
+       return rev, kv, nil
+}
+
+func Post(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, *errsvc.Error) {
+       if kv.Status == "" {
+               kv.Status = common.StatusDisabled
+       }
+       err := validator.Validate(kv)
+       if err != nil {
+               return nil, config.NewError(config.ErrInvalidParams, 
err.Error())
+       }
+       err = quota.PreCreate("", kv.Domain, "", 1)
+       if err != nil {
+               if err == quota.ErrReached {
+                       openlog.Error(fmt.Sprintf("can not create kv %s@%s, due 
to quota violation", kv.Key, kv.Project))
+                       return nil, config.NewError(config.ErrNotEnoughQuota, 
err.Error())
+               }
+               openlog.Error(err.Error())
+               return nil, config.NewError(config.ErrInternal, "quota check 
failed")
+       }
+       kv, err = service.KVService.Create(ctx, kv)
+       if err != nil {
+               openlog.Error(fmt.Sprintf("post err:%s", err.Error()))
+               if err == session.ErrKVAlreadyExists {
+                       return nil, 
config.NewError(config.ErrRecordAlreadyExists, err.Error())
+               }
+               return nil, config.NewError(config.ErrInternal, "create kv 
failed")
+       }
+       return kv, nil
+}
+
+func Upload(ctx context.Context, request *model.UploadKVRequest) 
*model.DocRespOfUpload {
+       override := request.Override
+       kvs := request.KVs
+       result := &model.DocRespOfUpload{
+               Success: []*model.KVDoc{},
+               Failure: []*model.DocFailedOfUpload{},
+       }
+       strategy := SelectStrategy(override)
+       for i, kv := range kvs {
+               if kv == nil {
+                       continue
+               }
+               kv.Domain = request.Domain
+               kv.Project = request.Project
+               kv, err := strategy.Execute(ctx, kv)
+               if err != nil {
+                       if err.Code == config.ErrStopUpload {
+                               appendAbortFailedKVResult(kvs[i:], result)
+                               break
+                       }
+                       appendFailedKVResult(err, kv, result)
+                       continue
+               }
+
+               Publish(kv)
+               result.Success = append(result.Success, kv)
+       }
+       return result
+}
+
+func appendFailedKVResult(err *errsvc.Error, kv *model.KVDoc, result 
*model.DocRespOfUpload) {
+       failedKv := &model.DocFailedOfUpload{
+               Key:     kv.Key,
+               Labels:  kv.Labels,
+               ErrCode: err.Code,
+               ErrMsg:  err.Detail,
+       }
+       result.Failure = append(result.Failure, failedKv)
+}
+
+func appendAbortFailedKVResult(kvs []*model.KVDoc, result 
*model.DocRespOfUpload) {
+       for _, kv := range kvs {
+               failedKv := &model.DocFailedOfUpload{
+                       Key:     kv.Key,
+                       Labels:  kv.Labels,
+                       ErrCode: config.ErrStopUpload,
+                       ErrMsg:  "stop overriding kvs after reaching the 
duplicate kv",
+               }
+               result.Failure = append(result.Failure, failedKv)
+       }
+}
+
+func Publish(kv *model.KVDoc) {
+       err := pubsub.Publish(&pubsub.KVChangeEvent{
+               Key:      kv.Key,
+               Labels:   kv.Labels,
+               Project:  kv.Project,
+               DomainID: kv.Domain,
+               Action:   pubsub.ActionPut,
+       })
+       if err != nil {
+               openlog.Warn("lost kv change event when post:" + err.Error())
+       }
+       openlog.Info(fmt.Sprintf("post [%s] success", kv.ID))
+}
diff --git a/server/service/kv/override.go b/server/service/kv/override.go
new file mode 100644
index 0000000..9f4af1a
--- /dev/null
+++ b/server/service/kv/override.go
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kv
+
+import (
+       "context"
+       "github.com/apache/servicecomb-kie/pkg/model"
+       "github.com/go-chassis/cari/pkg/errsvc"
+)
+
+var strategyMap = make(map[string]OverrideStrategy)
+
+type OverrideStrategy interface {
+       Execute(ctx context.Context, input *model.KVDoc) (*model.KVDoc, 
*errsvc.Error)
+}
+
+func RegisterStrategy(override string, strategy OverrideStrategy) {
+       strategyMap[override] = strategy
+}
+
+func SelectStrategy(override string) OverrideStrategy {
+       return strategyMap[override]
+}
diff --git a/server/service/kv/override_abort.go 
b/server/service/kv/override_abort.go
new file mode 100644
index 0000000..9b1b236
--- /dev/null
+++ b/server/service/kv/override_abort.go
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kv
+
+import (
+       "context"
+       "fmt"
+       "github.com/apache/servicecomb-kie/pkg/model"
+       "github.com/go-chassis/cari/config"
+       "github.com/go-chassis/cari/pkg/errsvc"
+       "github.com/go-chassis/openlog"
+)
+
+func init() {
+       RegisterStrategy("abort", &Abort{})
+}
+
+type Abort struct {
+}
+
+func (a *Abort) Execute(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, 
*errsvc.Error) {
+       inputKV := kv
+       kv, err := Post(ctx, kv)
+       if err == nil {
+               return kv, nil
+       }
+       if err.Code == config.ErrRecordAlreadyExists {
+               openlog.Info(fmt.Sprintf("stop overriding duplicate [key: %s, 
labels: %s]", inputKV.Key, inputKV.Labels))
+               return inputKV, config.NewError(config.ErrStopUpload, "stop 
overriding duplicate kv")
+       }
+       return inputKV, err
+}
diff --git a/server/service/kv/override_force.go 
b/server/service/kv/override_force.go
new file mode 100644
index 0000000..e188430
--- /dev/null
+++ b/server/service/kv/override_force.go
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kv
+
+import (
+       "context"
+       "fmt"
+       "github.com/apache/servicecomb-kie/pkg/model"
+       "github.com/apache/servicecomb-kie/server/service"
+       "github.com/go-chassis/cari/config"
+       "github.com/go-chassis/cari/pkg/errsvc"
+       "github.com/go-chassis/openlog"
+)
+
+func init() {
+       RegisterStrategy("force", &Force{})
+}
+
+type Force struct {
+}
+
+func (f *Force) Execute(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, 
*errsvc.Error) {
+       input := kv
+       kv, err := Post(ctx, kv)
+       if err == nil {
+               return kv, nil
+       }
+       if err.Code != config.ErrRecordAlreadyExists {
+               return input, err
+       }
+
+       request := &model.ListKVRequest{
+               Project: input.Project,
+               Domain:  input.Domain,
+               Key:     input.Key,
+               Labels:  input.Labels,
+       }
+       _, getKvsByOpts, getKvErr := ListKV(ctx, request)
+       if getKvErr != nil {
+               openlog.Info(fmt.Sprintf("get record [key: %s, labels: %s] 
failed", input.Key, input.Labels))
+               return input, getKvErr
+       }
+       kvReq := &model.UpdateKVRequest{
+               ID:      getKvsByOpts.Data[0].ID,
+               Value:   input.Value,
+               Status:  input.Status,
+               Project: input.Project,
+               Domain:  input.Domain,
+       }
+       kv, updateErr := service.KVService.Update(ctx, kvReq)
+       if updateErr != nil {
+               openlog.Error(fmt.Sprintf("update record [key: %s, labels: %s] 
failed", input.Key, input.Labels))
+               return input, config.NewError(config.ErrInternal, 
updateErr.Error())
+       }
+       return kv, nil
+}
diff --git a/server/service/kv/override_skip.go 
b/server/service/kv/override_skip.go
new file mode 100644
index 0000000..7031e0a
--- /dev/null
+++ b/server/service/kv/override_skip.go
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kv
+
+import (
+       "context"
+       "fmt"
+       "github.com/apache/servicecomb-kie/pkg/model"
+       "github.com/go-chassis/cari/config"
+       "github.com/go-chassis/cari/pkg/errsvc"
+       "github.com/go-chassis/openlog"
+)
+
+func init() {
+       RegisterStrategy("skip", &Skip{})
+}
+
+type Skip struct {
+}
+
+func (s *Skip) Execute(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, 
*errsvc.Error) {
+       inputKV := kv
+       kv, err := Post(ctx, kv)
+       if err == nil {
+               return kv, nil
+       }
+       if err.Code == config.ErrRecordAlreadyExists {
+               openlog.Info(fmt.Sprintf("skip overriding duplicate [key: %s, 
labels: %s]", inputKV.Key, inputKV.Labels))
+               return inputKV, config.NewError(config.ErrSkipDuplicateKV, 
"skip overriding duplicate kvs")
+       }
+       return inputKV, err
+}
diff --git a/server/service/mongo/counter/revision.go 
b/server/service/mongo/counter/revision.go
index 67fffc6..7b638cf 100644
--- a/server/service/mongo/counter/revision.go
+++ b/server/service/mongo/counter/revision.go
@@ -63,8 +63,8 @@ func ApplyRevision(ctx context.Context, domain string) 
(int64, error) {
        filter := bson.M{"name": revision, "domain": domain}
        sr := collection.FindOneAndUpdate(ctx, filter,
                bson.D{
-                       {"$inc", bson.D{
-                               {"count", 1},
+                       {Key: "$inc", Value: bson.D{
+                               {Key: "count", Value: 1},
                        }}}, 
options.FindOneAndUpdate().SetReturnDocument(options.After))
        if sr.Err() != nil {
                return 0, sr.Err()
diff --git a/server/service/mongo/history/dao.go 
b/server/service/mongo/history/dao.go
index a215d08..58e723a 100644
--- a/server/service/mongo/history/dao.go
+++ b/server/service/mongo/history/dao.go
@@ -99,10 +99,14 @@ func AddHistory(ctx context.Context, kv *model.KVDoc) error 
{
 func AddDeleteTime(ctx context.Context, kvIDs []string, project, domain 
string) error {
        collection := session.GetDB().Collection(session.CollectionKVRevision)
        now := time.Now()
-       filter := bson.D{{"id", bson.M{"$in": kvIDs}}, {"project", project}, 
{"domain", domain}}
+       filter := bson.D{
+               {Key: "id", Value: bson.M{"$in": kvIDs}},
+               {Key: "project", Value: project},
+               {Key: "domain", Value: domain},
+       }
        _, err := collection.UpdateMany(ctx, filter, bson.D{
-               {"$set", bson.D{
-                       {"delete_time", now},
+               {Key: "$set", Value: bson.D{
+                       {Key: "delete_time", Value: now},
                }},
        })
        if err != nil {
diff --git a/server/service/mongo/kv/kv_dao.go 
b/server/service/mongo/kv/kv_dao.go
index 378e559..38ab84f 100644
--- a/server/service/mongo/kv/kv_dao.go
+++ b/server/service/mongo/kv/kv_dao.go
@@ -83,12 +83,12 @@ func updateKeyValue(ctx context.Context, kv *model.KVDoc) 
error {
        }
        collection := session.GetDB().Collection(session.CollectionKV)
        ur, err := collection.UpdateOne(ctx, bson.M{"key": kv.Key, 
"label_format": kv.LabelFormat}, bson.D{
-               {"$set", bson.D{
-                       {"value", kv.Value},
-                       {"status", kv.Status},
-                       {"checker", kv.Checker},
-                       {"update_time", kv.UpdateTime},
-                       {"update_revision", kv.UpdateRevision},
+               {Key: "$set", Value: bson.D{
+                       {Key: "value", Value: kv.Value},
+                       {Key: "status", Value: kv.Status},
+                       {Key: "checker", Value: kv.Checker},
+                       {Key: "update_time", Value: kv.UpdateTime},
+                       {Key: "update_revision", Value: kv.UpdateRevision},
                }},
        })
        if err != nil {
@@ -119,7 +119,9 @@ func getValue(str string) string {
 
 func findKV(ctx context.Context, domain string, project string, opts 
service.FindOptions) (*mongo.Cursor, int, error) {
        collection := session.GetDB().Collection(session.CollectionKV)
-       ctx, _ = context.WithTimeout(ctx, opts.Timeout)
+       ctx, cancel := context.WithTimeout(ctx, opts.Timeout)
+       defer cancel()
+
        filter := bson.M{"domain": domain, "project": project}
        if opts.Key != "" {
                filter["key"] = opts.Key
@@ -216,7 +218,10 @@ func findOneKVAndDelete(ctx context.Context, kvID, 
project, domain string) (*mod
 
 //findKVsAndDelete deletes multiple kvs and return the deleted kv list as 
these appeared before deletion
 func findKVsAndDelete(ctx context.Context, kvIDs []string, project, domain 
string) ([]*model.KVDoc, error) {
-       filter := bson.D{{"id", bson.M{"$in": kvIDs}}, {"project", project}, 
{"domain", domain}}
+       filter := bson.D{
+               {Key: "id", Value: bson.M{"$in": kvIDs}},
+               {Key: "project", Value: project},
+               {Key: "domain", Value: domain}}
        kvs, err := findKeys(ctx, filter, false)
        if err != nil {
                if err != service.ErrKeyNotExists {
diff --git a/server/service/mongo/kv/kv_service.go 
b/server/service/mongo/kv/kv_service.go
index c1d0cae..eb085fc 100644
--- a/server/service/mongo/kv/kv_service.go
+++ b/server/service/mongo/kv/kv_service.go
@@ -19,10 +19,8 @@ package kv
 
 import (
        "context"
-       "github.com/apache/servicecomb-kie/pkg/stringutil"
-       "time"
-
        "github.com/apache/servicecomb-kie/pkg/model"
+       "github.com/apache/servicecomb-kie/pkg/stringutil"
        "github.com/apache/servicecomb-kie/pkg/util"
        "github.com/apache/servicecomb-kie/server/service"
        "github.com/apache/servicecomb-kie/server/service/mongo/session"
@@ -39,12 +37,13 @@ const (
 
 //Service operate data in mongodb
 type Service struct {
-       timeout time.Duration
 }
 
 //Create will create a key value record
 func (s *Service) Create(ctx context.Context, kv *model.KVDoc) (*model.KVDoc, 
error) {
-       ctx, _ = context.WithTimeout(ctx, session.Timeout)
+       ctx, cancel := context.WithTimeout(ctx, session.Timeout)
+       defer cancel()
+
        if kv.Labels == nil {
                kv.Labels = map[string]string{}
        }
@@ -72,7 +71,9 @@ func (s *Service) Create(ctx context.Context, kv 
*model.KVDoc) (*model.KVDoc, er
 
 //Update will update a key value record
 func (s *Service) Update(ctx context.Context, kv *model.UpdateKVRequest) 
(*model.KVDoc, error) {
-       ctx, _ = context.WithTimeout(ctx, session.Timeout)
+       ctx, cancel := context.WithTimeout(ctx, session.Timeout)
+       defer cancel()
+
        getRequest := &model.GetKVRequest{
                Domain:  kv.Domain,
                Project: kv.Project,
@@ -99,7 +100,9 @@ func (s *Service) Update(ctx context.Context, kv 
*model.UpdateKVRequest) (*model
 
 //Exist supports you query a key value by label map or labels id
 func (s *Service) Exist(ctx context.Context, domain, key string, project 
string, options ...service.FindOption) (*model.KVDoc, error) {
-       ctx, _ = context.WithTimeout(context.Background(), session.Timeout)
+       ctx, cancel := context.WithTimeout(ctx, session.Timeout)
+       defer cancel()
+
        opts := service.FindOptions{}
        for _, o := range options {
                o(&opts)
@@ -133,13 +136,15 @@ func (s *Service) Exist(ctx context.Context, domain, key 
string, project string,
 //FindOneAndDelete deletes one kv by id and return the deleted kv as these 
appeared before deletion
 //domain=tenant
 func (s *Service) FindOneAndDelete(ctx context.Context, kvID string, domain 
string, project string) (*model.KVDoc, error) {
-       ctx, _ = context.WithTimeout(context.Background(), session.Timeout)
+       ctx, cancel := context.WithTimeout(ctx, session.Timeout)
+       defer cancel()
        return findOneKVAndDelete(ctx, kvID, project, domain)
 }
 
 //FindManyAndDelete deletes multiple kvs and return the deleted kv list as 
these appeared before deletion
 func (s *Service) FindManyAndDelete(ctx context.Context, kvIDs []string, 
domain string, project string) ([]*model.KVDoc, error) {
-       ctx, _ = context.WithTimeout(context.Background(), session.Timeout)
+       ctx, cancel := context.WithTimeout(ctx, session.Timeout)
+       defer cancel()
        return findKVsAndDelete(ctx, kvIDs, project, domain)
 }
 
@@ -182,6 +187,7 @@ func (s *Service) Get(ctx context.Context, request 
*model.GetKVRequest) (*model.
 
 //Total return kv record number
 func (s *Service) Total(ctx context.Context, domain string) (int64, error) {
-       ctx, _ = context.WithTimeout(ctx, session.Timeout)
+       ctx, cancel := context.WithTimeout(ctx, session.Timeout)
+       defer cancel()
        return total(ctx, domain)
 }
diff --git a/server/service/mongo/session/session.go 
b/server/service/mongo/session/session.go
index a5e5b0b..48b6b90 100644
--- a/server/service/mongo/session/session.go
+++ b/server/service/mongo/session/session.go
@@ -103,20 +103,20 @@ func Init() error {
        once.Do(func() {
                sc, _ := 
bsoncodec.NewStructCodec(bsoncodec.DefaultStructTagParser)
                reg := bson.NewRegistryBuilder().
-                       RegisterEncoder(reflect.TypeOf(model.LabelDoc{}), sc).
-                       RegisterEncoder(reflect.TypeOf(model.KVDoc{}), sc).
+                       RegisterTypeEncoder(reflect.TypeOf(model.LabelDoc{}), 
sc).
+                       RegisterTypeEncoder(reflect.TypeOf(model.KVDoc{}), sc).
                        Build()
                uri := cipherutil.TryDecrypt(config.GetDB().URI)
                clientOps := 
[]*options.ClientOptions{options.Client().ApplyURI(uri)}
                if config.GetDB().SSLEnabled {
                        if config.GetDB().RootCA == "" {
-                               err = ErrRootCAMissing
+                               openlog.Error(ErrRootCAMissing.Error())
                                return
                        }
                        pool := x509.NewCertPool()
                        caCert, err := ioutil.ReadFile(config.GetDB().RootCA)
                        if err != nil {
-                               err = fmt.Errorf("read ca cert file %s failed", 
caCert)
+                               openlog.Error(fmt.Sprintf("read ca cert file %s 
failed", caCert))
                                return
                        }
                        pool.AppendCertsFromPEM(caCert)
@@ -133,7 +133,10 @@ func Init() error {
                        return
                }
                openlog.Info("DB connecting")
-               ctx, _ := context.WithTimeout(context.Background(), 
10*time.Second)
+
+               ctx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
+               defer cancel()
+
                err = client.Connect(ctx)
                if err != nil {
                        return
@@ -157,9 +160,9 @@ func GetDB() *mongo.Database {
 func CreateView(ctx context.Context, view, source string, pipeline 
mongo.Pipeline) error {
        sr := GetDB().RunCommand(ctx,
                bson.D{
-                       {"create", view},
-                       {"viewOn", source},
-                       {"pipeline", pipeline},
+                       {Key: "create", Value: view},
+                       {Key: "viewOn", Value: source},
+                       {Key: "pipeline", Value: pipeline},
                })
        if sr.Err() != nil {
                openlog.Error("can not create view: " + sr.Err().Error())
@@ -186,18 +189,16 @@ func GetColInfo(ctx context.Context, name string) 
(*CollectionInfo, error) {
                return nil, ErrGetPipeline
        }
        defer cur.Close(ctx)
-       for cur.Next(ctx) {
-               openlog.Debug(cur.Current.String())
-               c := &CollectionInfo{}
-               err := cur.Decode(c)
-               if err != nil {
-                       openlog.Error(err.Error())
-                       return nil, ErrGetPipeline
-               }
-               return c, nil
-               break
+       if !cur.Next(ctx) {
+               return nil, ErrGetPipeline
+       }
+       openlog.Debug(cur.Current.String())
+       c := &CollectionInfo{}
+       if err := cur.Decode(c); err != nil {
+               openlog.Error(err.Error())
+               return nil, ErrGetPipeline
        }
-       return nil, ErrGetPipeline
+       return c, nil
 }
 
 //EnsureDB build mongo db schema
diff --git a/server/service/mongo/track/polling_detail_dao.go 
b/server/service/mongo/track/polling_detail_dao.go
index 3380ebf..42bb812 100644
--- a/server/service/mongo/track/polling_detail_dao.go
+++ b/server/service/mongo/track/polling_detail_dao.go
@@ -45,7 +45,7 @@ func CreateOrUpdate(ctx context.Context, detail 
*model.PollingDetail) (*model.Po
                }
                return nil, res.Err()
        }
-       _, err := collection.UpdateOne(ctx, queryFilter, bson.D{{"$set", 
detail}})
+       _, err := collection.UpdateOne(ctx, queryFilter, bson.D{{Key: "$set", 
Value: detail}})
        if err != nil {
                return nil, err
        }
diff --git a/server/service/mongo/view/view_service.go 
b/server/service/mongo/view/view_service.go
index 4c40350..049e7ff 100644
--- a/server/service/mongo/view/view_service.go
+++ b/server/service/mongo/view/view_service.go
@@ -27,12 +27,10 @@ import (
        "go.mongodb.org/mongo-driver/bson"
        "go.mongodb.org/mongo-driver/mongo"
        "go.mongodb.org/mongo-driver/mongo/options"
-       "time"
 )
 
 //Service operate data in mongodb
 type Service struct {
-       timeout time.Duration
 }
 
 //Create insert a view data and create a mongo db view
@@ -42,8 +40,11 @@ func (s *Service) Create(ctx context.Context, viewDoc 
*model.ViewDoc, options ..
        }
        var pipeline mongo.Pipeline = []bson.D{
                {{
-                       "$match",
-                       bson.D{{"domain", viewDoc.Domain}, {"project", 
viewDoc.Project}},
+                       Key: "$match",
+                       Value: bson.D{
+                               {Key: "domain", Value: viewDoc.Domain},
+                               {Key: "project", Value: viewDoc.Project},
+                       },
                }},
        }
        opts := service.FindOptions{}
@@ -51,11 +52,11 @@ func (s *Service) Create(ctx context.Context, viewDoc 
*model.ViewDoc, options ..
                o(&opts)
        }
        if opts.Key != "" {
-               pipeline = append(pipeline, bson.D{{"$match", bson.D{{"key", 
opts.Key}}}})
+               pipeline = append(pipeline, bson.D{{Key: "$match", Value: 
bson.D{{Key: "key", Value: opts.Key}}}})
        }
        if len(opts.Labels) != 0 {
                for k, v := range opts.Labels {
-                       pipeline = append(pipeline, bson.D{{"$match", 
bson.D{{"labels." + k, v}}}})
+                       pipeline = append(pipeline, bson.D{{Key: "$match", 
Value: bson.D{{Key: "labels." + k, Value: v}}}})
                }
        }
        viewDoc.ID = uuid.NewV4().String()
@@ -98,9 +99,9 @@ func (s *Service) Update(ctx context.Context, viewDoc 
*model.ViewDoc) error {
                "project": oldView.Project,
                "id":      oldView.ID},
                bson.D{
-                       {"$set", bson.D{
-                               {"name", oldView.Display},
-                               {"criteria", oldView.Criteria},
+                       {Key: "$set", Value: bson.D{
+                               {Key: "name", Value: oldView.Display},
+                               {Key: "criteria", Value: oldView.Criteria},
                        }},
                })
        if err != nil {

Reply via email to