This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-kie.git
The following commit(s) were added to refs/heads/master by this push:
new 91d97ed kv history support rotate (#121)
91d97ed is described below
commit 91d97ed1b94f59b4f0c87cedf66bf98a7e689f1d
Author: zhulijian <[email protected]>
AuthorDate: Mon Mar 23 16:08:16 2020 +0800
kv history support rotate (#121)
---
deployments/db.js | 3 +-
pkg/model/db_schema.go | 4 +-
server/service/mongo/history/dao.go | 80 +++++++++++++++++++++++++++++++++
server/service/mongo/kv/kv_dao.go | 6 ++-
server/service/mongo/session/session.go | 12 ++++-
5 files changed, 99 insertions(+), 6 deletions(-)
diff --git a/deployments/db.js b/deployments/db.js
index a3304df..fd4479d 100644
--- a/deployments/db.js
+++ b/deployments/db.js
@@ -55,7 +55,7 @@ db.createCollection( "kv", {
}
} }
} );
-
+db.createCollection("kv_revision");
db.createCollection( "label", {
validator: { $jsonSchema: {
bsonType: "object",
@@ -135,6 +135,7 @@ db.createCollection( "polling_detail", {
//index
db.kv.createIndex({"id": 1}, { unique: true } );
db.kv.createIndex({key: 1, label_id: 1,domain:1,project:1},{ unique: true });
+db.kv_revision.createIndex( { "delete_time": 1 }, { expireAfterSeconds: 7 * 24
* 3600 } );
db.label.createIndex({"id": 1}, { unique: true } );
db.label.createIndex({format: 1,domain:1,project:1},{ unique: true });
db.polling_detail.createIndex({"id": 1}, { unique: true } );
diff --git a/pkg/model/db_schema.go b/pkg/model/db_schema.go
index e786a85..c2fb6ad 100644
--- a/pkg/model/db_schema.go
+++ b/pkg/model/db_schema.go
@@ -39,8 +39,8 @@ type KVDoc struct {
UpdateRevision int64 `json:"update_revision,omitempty"
bson:"update_revision," yaml:"update_revision,omitempty"`
Project string `json:"project,omitempty"
yaml:"project,omitempty"`
Status string `json:"status,omitempty" yaml:"status,omitempty"`
- CreatTime string `json:"create_time,omitempty"
yaml:"create_time,omitempty"`
- UpdateTime string `json:"update_time,omitempty"
yaml:"update_time,omitempty"`
+ CreateTime string `json:"create_time,omitempty" bson:"create_time,"
yaml:"create_time,omitempty"`
+ UpdateTime string `json:"update_time,omitempty" bson:"update_time,"
yaml:"update_time,omitempty"`
Labels map[string]string `json:"labels,omitempty"
yaml:"labels,omitempty"` //redundant
Domain string `json:"domain,omitempty"
yaml:"domain,omitempty"` //redundant
diff --git a/server/service/mongo/history/dao.go
b/server/service/mongo/history/dao.go
index 86af4a8..414908d 100644
--- a/server/service/mongo/history/dao.go
+++ b/server/service/mongo/history/dao.go
@@ -19,6 +19,9 @@ package history
import (
"context"
+ "fmt"
+ "time"
+
"github.com/apache/servicecomb-kie/pkg/model"
"github.com/apache/servicecomb-kie/server/service"
"github.com/apache/servicecomb-kie/server/service/mongo/session"
@@ -27,6 +30,11 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
)
+//const of history
+const (
+ maxHistoryNum = 100
+)
+
func getHistoryByKeyID(ctx context.Context, filter bson.M, offset, limit
int64) ([]*model.KVDoc, int, error) {
collection := session.GetDB().Collection(session.CollectionKVRevision)
opt := options.Find().SetSort(map[string]interface{}{
@@ -72,5 +80,77 @@ func AddHistory(ctx context.Context, kv *model.KVDoc) error {
openlogging.Error(err.Error())
return err
}
+ err = historyRotate(ctx, kv.ID, kv.Project, kv.Domain)
+ if err != nil {
+ openlogging.Error("history rotate err: " + err.Error())
+ return err
+ }
+ return nil
+}
+
+//AddDeleteTime add delete time to all revisions of the kv,
+//thus these revisions will be automatically deleted by TTL index.
+func AddDeleteTime(ctx context.Context, kvID, project, domain string) error {
+ collection := session.GetDB().Collection(session.CollectionKVRevision)
+ now := time.Now()
+ _, err := collection.UpdateMany(ctx, bson.M{"id": kvID, "project":
project, "domain": domain}, bson.D{
+ {"$set", bson.D{
+ {"delete_time", now},
+ }},
+ })
+ if err != nil {
+ return err
+ }
+ openlogging.Debug(fmt.Sprintf("added delete time [%s] to key [%s]",
now.String(), kvID))
+ return nil
+}
+
+//historyRotate delete historical versions for a key that exceeds the limited
number
+func historyRotate(ctx context.Context, kvID, project, domain string) error {
+ ctx, cancel := context.WithTimeout(ctx, session.Timeout)
+ defer cancel()
+ filter := bson.M{"id": kvID, "domain": domain, "project": project}
+ collection := session.GetDB().Collection(session.CollectionKVRevision)
+ curTotal, err := collection.CountDocuments(ctx, filter)
+ if err != nil {
+ return err
+ }
+ if curTotal <= maxHistoryNum {
+ return nil
+ }
+ opt := options.Find().SetSort(map[string]interface{}{
+ "update_revision": 1,
+ })
+ opt = opt.SetLimit(curTotal - maxHistoryNum)
+ cur, err := collection.Find(ctx, filter, opt)
+ if err != nil {
+ return err
+ }
+ defer cur.Close(ctx)
+ if cur.Err() != nil {
+ return err
+ }
+ for cur.Next(ctx) {
+ curKV := &model.KVDoc{}
+ if err := cur.Decode(curKV); err != nil {
+ openlogging.Error("decode to KVs error: " + err.Error())
+ return err
+ }
+ _, err := collection.DeleteOne(ctx, bson.M{
+ "id": kvID,
+ "domain": domain,
+ "project": project,
+ "update_revision": curKV.UpdateRevision,
+ })
+ if err != nil {
+ return err
+ }
+ openlogging.Debug("delete overflowed revision",
openlogging.WithTags(openlogging.Tags{
+ "id": curKV.ID,
+ "key": curKV.Key,
+ "revision": curKV.UpdateRevision,
+ }))
+ }
+
return nil
}
diff --git a/server/service/mongo/kv/kv_dao.go
b/server/service/mongo/kv/kv_dao.go
index 8c7ddb9..059264c 100644
--- a/server/service/mongo/kv/kv_dao.go
+++ b/server/service/mongo/kv/kv_dao.go
@@ -48,7 +48,7 @@ func createKey(ctx context.Context, kv *model.KVDoc)
(*model.KVDoc, error) {
}
kv.UpdateRevision = revision
kv.CreateRevision = revision
- kv.CreatTime = time.Now().String()
+ kv.CreateTime = time.Now().String()
kv.UpdateTime = time.Now().String()
_, err = collection.InsertOne(ctx, kv)
if err != nil {
@@ -183,6 +183,10 @@ func deleteKV(ctx context.Context, kvID, project, domain
string) error {
} else {
openlogging.Info(fmt.Sprintf("delete success,kvID=%s", kvID))
}
+ err = history.AddDeleteTime(ctx, kvID, project, domain)
+ if err != nil {
+ openlogging.Error(fmt.Sprintf("add delete time to [%s] failed :
[%s]", kvID, err))
+ }
return err
}
func findKeys(ctx context.Context, filter bson.M, withoutLabel bool)
([]*model.KVDoc, error) {
diff --git a/server/service/mongo/session/session.go
b/server/service/mongo/session/session.go
index fa43f9e..3a5c06c 100644
--- a/server/service/mongo/session/session.go
+++ b/server/service/mongo/session/session.go
@@ -206,7 +206,7 @@ func InitMongodb() {
panic(err)
}
//kv
- c = session.DB("kie").C("kv")
+ c = session.DB(DBName).C("kv")
err = c.Create(&mgo.CollectionInfo{Validator: bson.M{
"key": bson.M{"$exists": true},
"domain": bson.M{"$exists": true},
@@ -227,7 +227,15 @@ func InitMongodb() {
if err != nil {
panic(err)
}
-
+ //kv_revision
+ c = session.DB(DBName).C(CollectionKVRevision)
+ err = c.EnsureIndex(mgo.Index{
+ Key: []string{"delete_time"},
+ ExpireAfter: 7 * 24 * time.Hour,
+ })
+ if err != nil {
+ panic(err)
+ }
//label
c = session.DB(DBName).C(CollectionLabel)
err = c.Create(&mgo.CollectionInfo{Validator: bson.M{