This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git


The following commit(s) were added to refs/heads/master by this push:
     new 91d97ed  kv history support rotate (#121)
91d97ed is described below

commit 91d97ed1b94f59b4f0c87cedf66bf98a7e689f1d
Author: zhulijian <[email protected]>
AuthorDate: Mon Mar 23 16:08:16 2020 +0800

    kv history support rotate (#121)
---
 deployments/db.js                       |  3 +-
 pkg/model/db_schema.go                  |  4 +-
 server/service/mongo/history/dao.go     | 80 +++++++++++++++++++++++++++++++++
 server/service/mongo/kv/kv_dao.go       |  6 ++-
 server/service/mongo/session/session.go | 12 ++++-
 5 files changed, 99 insertions(+), 6 deletions(-)

diff --git a/deployments/db.js b/deployments/db.js
index a3304df..fd4479d 100644
--- a/deployments/db.js
+++ b/deployments/db.js
@@ -55,7 +55,7 @@ db.createCollection( "kv", {
             }
         } }
 } );
-
+db.createCollection("kv_revision");
 db.createCollection( "label", {
     validator: { $jsonSchema: {
             bsonType: "object",
@@ -135,6 +135,7 @@ db.createCollection( "polling_detail", {
 //index
 db.kv.createIndex({"id": 1}, { unique: true } );
 db.kv.createIndex({key: 1, label_id: 1,domain:1,project:1},{ unique: true });
+db.kv_revision.createIndex( { "delete_time": 1 }, { expireAfterSeconds: 7 * 24 
* 3600 } );
 db.label.createIndex({"id": 1}, { unique: true } );
 db.label.createIndex({format: 1,domain:1,project:1},{ unique: true });
 db.polling_detail.createIndex({"id": 1}, { unique: true } );
diff --git a/pkg/model/db_schema.go b/pkg/model/db_schema.go
index e786a85..c2fb6ad 100644
--- a/pkg/model/db_schema.go
+++ b/pkg/model/db_schema.go
@@ -39,8 +39,8 @@ type KVDoc struct {
        UpdateRevision int64  `json:"update_revision,omitempty" 
bson:"update_revision," yaml:"update_revision,omitempty"`
        Project        string `json:"project,omitempty" 
yaml:"project,omitempty"`
        Status         string `json:"status,omitempty" yaml:"status,omitempty"`
-       CreatTime      string `json:"create_time,omitempty" 
yaml:"create_time,omitempty"`
-       UpdateTime     string `json:"update_time,omitempty" 
yaml:"update_time,omitempty"`
+       CreateTime     string `json:"create_time,omitempty" bson:"create_time," 
yaml:"create_time,omitempty"`
+       UpdateTime     string `json:"update_time,omitempty" bson:"update_time," 
yaml:"update_time,omitempty"`
 
        Labels map[string]string `json:"labels,omitempty" 
yaml:"labels,omitempty"` //redundant
        Domain string            `json:"domain,omitempty" 
yaml:"domain,omitempty"` //redundant
diff --git a/server/service/mongo/history/dao.go 
b/server/service/mongo/history/dao.go
index 86af4a8..414908d 100644
--- a/server/service/mongo/history/dao.go
+++ b/server/service/mongo/history/dao.go
@@ -19,6 +19,9 @@ package history
 
 import (
        "context"
+       "fmt"
+       "time"
+
        "github.com/apache/servicecomb-kie/pkg/model"
        "github.com/apache/servicecomb-kie/server/service"
        "github.com/apache/servicecomb-kie/server/service/mongo/session"
@@ -27,6 +30,11 @@ import (
        "go.mongodb.org/mongo-driver/mongo/options"
 )
 
+//const of history
+const (
+       maxHistoryNum = 100
+)
+
 func getHistoryByKeyID(ctx context.Context, filter bson.M, offset, limit 
int64) ([]*model.KVDoc, int, error) {
        collection := session.GetDB().Collection(session.CollectionKVRevision)
        opt := options.Find().SetSort(map[string]interface{}{
@@ -72,5 +80,77 @@ func AddHistory(ctx context.Context, kv *model.KVDoc) error {
                openlogging.Error(err.Error())
                return err
        }
+       err = historyRotate(ctx, kv.ID, kv.Project, kv.Domain)
+       if err != nil {
+               openlogging.Error("history rotate err: " + err.Error())
+               return err
+       }
+       return nil
+}
+
+//AddDeleteTime add delete time to all revisions of the kv,
+//thus these revisions will be automatically deleted by TTL index.
+func AddDeleteTime(ctx context.Context, kvID, project, domain string) error {
+       collection := session.GetDB().Collection(session.CollectionKVRevision)
+       now := time.Now()
+       _, err := collection.UpdateMany(ctx, bson.M{"id": kvID, "project": 
project, "domain": domain}, bson.D{
+               {"$set", bson.D{
+                       {"delete_time", now},
+               }},
+       })
+       if err != nil {
+               return err
+       }
+       openlogging.Debug(fmt.Sprintf("added delete time [%s] to key [%s]", 
now.String(), kvID))
+       return nil
+}
+
+//historyRotate delete historical versions for a key that exceeds the limited 
number
+func historyRotate(ctx context.Context, kvID, project, domain string) error {
+       ctx, cancel := context.WithTimeout(ctx, session.Timeout)
+       defer cancel()
+       filter := bson.M{"id": kvID, "domain": domain, "project": project}
+       collection := session.GetDB().Collection(session.CollectionKVRevision)
+       curTotal, err := collection.CountDocuments(ctx, filter)
+       if err != nil {
+               return err
+       }
+       if curTotal <= maxHistoryNum {
+               return nil
+       }
+       opt := options.Find().SetSort(map[string]interface{}{
+               "update_revision": 1,
+       })
+       opt = opt.SetLimit(curTotal - maxHistoryNum)
+       cur, err := collection.Find(ctx, filter, opt)
+       if err != nil {
+               return err
+       }
+       defer cur.Close(ctx)
+       if cur.Err() != nil {
+               return err
+       }
+       for cur.Next(ctx) {
+               curKV := &model.KVDoc{}
+               if err := cur.Decode(curKV); err != nil {
+                       openlogging.Error("decode to KVs error: " + err.Error())
+                       return err
+               }
+               _, err := collection.DeleteOne(ctx, bson.M{
+                       "id":              kvID,
+                       "domain":          domain,
+                       "project":         project,
+                       "update_revision": curKV.UpdateRevision,
+               })
+               if err != nil {
+                       return err
+               }
+               openlogging.Debug("delete overflowed revision", 
openlogging.WithTags(openlogging.Tags{
+                       "id":       curKV.ID,
+                       "key":      curKV.Key,
+                       "revision": curKV.UpdateRevision,
+               }))
+       }
+
        return nil
 }
diff --git a/server/service/mongo/kv/kv_dao.go 
b/server/service/mongo/kv/kv_dao.go
index 8c7ddb9..059264c 100644
--- a/server/service/mongo/kv/kv_dao.go
+++ b/server/service/mongo/kv/kv_dao.go
@@ -48,7 +48,7 @@ func createKey(ctx context.Context, kv *model.KVDoc) 
(*model.KVDoc, error) {
        }
        kv.UpdateRevision = revision
        kv.CreateRevision = revision
-       kv.CreatTime = time.Now().String()
+       kv.CreateTime = time.Now().String()
        kv.UpdateTime = time.Now().String()
        _, err = collection.InsertOne(ctx, kv)
        if err != nil {
@@ -183,6 +183,10 @@ func deleteKV(ctx context.Context, kvID, project, domain 
string) error {
        } else {
                openlogging.Info(fmt.Sprintf("delete success,kvID=%s", kvID))
        }
+       err = history.AddDeleteTime(ctx, kvID, project, domain)
+       if err != nil {
+               openlogging.Error(fmt.Sprintf("add delete time to [%s] failed : 
[%s]", kvID, err))
+       }
        return err
 }
 func findKeys(ctx context.Context, filter bson.M, withoutLabel bool) 
([]*model.KVDoc, error) {
diff --git a/server/service/mongo/session/session.go 
b/server/service/mongo/session/session.go
index fa43f9e..3a5c06c 100644
--- a/server/service/mongo/session/session.go
+++ b/server/service/mongo/session/session.go
@@ -206,7 +206,7 @@ func InitMongodb() {
                panic(err)
        }
        //kv
-       c = session.DB("kie").C("kv")
+       c = session.DB(DBName).C("kv")
        err = c.Create(&mgo.CollectionInfo{Validator: bson.M{
                "key":     bson.M{"$exists": true},
                "domain":  bson.M{"$exists": true},
@@ -227,7 +227,15 @@ func InitMongodb() {
        if err != nil {
                panic(err)
        }
-
+       //kv_revision
+       c = session.DB(DBName).C(CollectionKVRevision)
+       err = c.EnsureIndex(mgo.Index{
+               Key:         []string{"delete_time"},
+               ExpireAfter: 7 * 24 * time.Hour,
+       })
+       if err != nil {
+               panic(err)
+       }
        //label
        c = session.DB(DBName).C(CollectionLabel)
        err = c.Create(&mgo.CollectionInfo{Validator: bson.M{

Reply via email to