This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 860af37 #1228 support kv timestamp (#1254)
860af37 is described below
commit 860af37511cc9bb2be247c72267f80df3b5d50aa
Author: aseTo2016 <[email protected]>
AuthorDate: Thu Feb 10 11:03:24 2022 +0800
#1228 support kv timestamp (#1254)
* #1228 support kv timestamp
* #1228 support kv timestamp
Co-authored-by: aseTo2016 <tys201193111>
---
syncer/service/replicator/resource/account.go | 4 +--
syncer/service/replicator/resource/instance.go | 4 +--
syncer/service/replicator/resource/kv.go | 31 +++++++++++++++++++++-
syncer/service/replicator/resource/kv_test.go | 29 ++++++++++++++++++++
syncer/service/replicator/resource/microservice.go | 4 +--
syncer/service/replicator/resource/resource.go | 17 ++++++++----
.../service/replicator/resource/resource_test.go | 13 +++++----
syncer/service/replicator/resource/role.go | 4 +--
syncer/service/task/manager.go | 5 +---
9 files changed, 86 insertions(+), 25 deletions(-)
diff --git a/syncer/service/replicator/resource/account.go
b/syncer/service/replicator/resource/account.go
index 68b12ad..a36bfcd 100644
--- a/syncer/service/replicator/resource/account.go
+++ b/syncer/service/replicator/resource/account.go
@@ -80,8 +80,8 @@ func (a *account) NeedOperate(ctx context.Context) *Result {
c := &checker{
curNotNil: a.cur != nil,
event: a.event,
- updateTime: func() string {
- return a.cur.UpdateTime
+ updateTime: func() (int64, error) {
+ return formatUpdateTimeSecond(a.cur.UpdateTime)
},
resourceID: a.input.Name,
}
diff --git a/syncer/service/replicator/resource/instance.go
b/syncer/service/replicator/resource/instance.go
index 16fdd04..430b207 100644
--- a/syncer/service/replicator/resource/instance.go
+++ b/syncer/service/replicator/resource/instance.go
@@ -159,8 +159,8 @@ func (i *instance) NeedOperate(ctx context.Context) *Result
{
c := &checker{
curNotNil: i.cur != nil,
event: i.event,
- updateTime: func() string {
- return i.cur.ModTimestamp
+ updateTime: func() (int64, error) {
+ return formatUpdateTimeSecond(i.cur.ModTimestamp)
},
resourceID: "",
}
diff --git a/syncer/service/replicator/resource/kv.go
b/syncer/service/replicator/resource/kv.go
index bac1f98..b3acb79 100644
--- a/syncer/service/replicator/resource/kv.go
+++ b/syncer/service/replicator/resource/kv.go
@@ -2,9 +2,12 @@ package resource
import (
"context"
+ "encoding/json"
"errors"
+ "fmt"
"sync"
+ "github.com/apache/servicecomb-service-center/pkg/log"
v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
"github.com/little-cui/etcdadpt"
@@ -12,6 +15,8 @@ import (
const (
KV = "kv"
+
+ ComparableKey = "comparable"
)
const (
@@ -63,11 +68,35 @@ func (k *kv) LoadCurrentResource(ctx context.Context)
*Result {
return nil
}
+type Value struct {
+ Timestamp int64 `json:"$timestamp"`
+}
+
+func (k *kv) getUpdateTime() (int64, error) {
+ if k.cur == nil {
+ return 0, nil
+ }
+
+ comparable, ok := k.event.Opts[ComparableKey]
+ if !ok || comparable != "true" {
+ return 0, nil
+ }
+
+ v := new(Value)
+ err := json.Unmarshal(k.cur, v)
+ if err != nil {
+ log.Warn(fmt.Sprintf("unmarshal kv %s value failed, err %s",
k.key, err.Error()))
+ return 0, err
+ }
+
+ return v.Timestamp, nil
+}
+
func (k *kv) NeedOperate(ctx context.Context) *Result {
c := &checker{
curNotNil: k.cur != nil,
event: k.event,
- updateTime: nil,
+ updateTime: k.getUpdateTime,
resourceID: k.key,
}
c.tombstoneLoader = c
diff --git a/syncer/service/replicator/resource/kv_test.go
b/syncer/service/replicator/resource/kv_test.go
index 34ce8cb..02a6fc0 100644
--- a/syncer/service/replicator/resource/kv_test.go
+++ b/syncer/service/replicator/resource/kv_test.go
@@ -2,6 +2,7 @@ package resource
import (
"context"
+ "fmt"
"testing"
"time"
@@ -161,3 +162,31 @@ func (f *mockKVManager) Delete(_ context.Context, key
string) error {
delete(f.kvs, key)
return nil
}
+
+func Test_kv_getUpdateTime(t *testing.T) {
+ now := time.Now().Unix()
+ data := []byte(fmt.Sprintf(`{"$timestamp": %d}`, now))
+ k := &kv{
+ key: "hello",
+ event: &v1sync.Event{
+ Opts: map[string]string{
+ ComparableKey: "true",
+ },
+ },
+ cur: data,
+ }
+ got, err := k.getUpdateTime()
+ if assert.Nil(t, err) {
+ assert.Equal(t, now, got)
+ }
+
+ k = &kv{
+ key: "hello",
+ event: &v1sync.Event{},
+ cur: data,
+ }
+ got, err = k.getUpdateTime()
+ if assert.Nil(t, err) {
+ assert.Equal(t, 0, got)
+ }
+}
diff --git a/syncer/service/replicator/resource/microservice.go
b/syncer/service/replicator/resource/microservice.go
index 3e17ac1..3e18a15 100644
--- a/syncer/service/replicator/resource/microservice.go
+++ b/syncer/service/replicator/resource/microservice.go
@@ -92,8 +92,8 @@ func (m *microservice) NeedOperate(ctx context.Context)
*Result {
c := &checker{
curNotNil: m.cur != nil,
event: m.event,
- updateTime: func() string {
- return m.cur.ModTimestamp
+ updateTime: func() (int64, error) {
+ return formatUpdateTimeSecond(m.cur.ModTimestamp)
},
resourceID: m.serviceID,
}
diff --git a/syncer/service/replicator/resource/resource.go
b/syncer/service/replicator/resource/resource.go
index 7acf09b..5087b05 100644
--- a/syncer/service/replicator/resource/resource.go
+++ b/syncer/service/replicator/resource/resource.go
@@ -278,25 +278,32 @@ type checker struct {
curNotNil bool
event *v1sync.Event
- updateTime func() string
+ updateTime func() (int64, error)
resourceID string
tombstoneLoader tombstoneLoader
}
+func formatUpdateTimeSecond(src string) (int64, error) {
+ updateTime, err := strconv.ParseInt(src, 0, 0)
+ if err != nil {
+ return 0, err
+ }
+
+ return updateTime * 1000 * 1000 * 1000, nil
+}
+
func (o *checker) needOperate(ctx context.Context) *Result {
if o.curNotNil {
if o.updateTime == nil {
return nil
}
- updateTime, err := strconv.ParseInt(o.updateTime(), 0, 0)
+ updateTime, err := o.updateTime()
if err != nil {
- log.Error("parse update time failed", err)
+ log.Error("get update time failed", err)
return FailResult(err)
}
-
- updateTime = updateTime * 1000 * 1000 * 1000
if updateTime >= o.event.Timestamp {
return SkipResult()
}
diff --git a/syncer/service/replicator/resource/resource_test.go
b/syncer/service/replicator/resource/resource_test.go
index 148dea7..5586c68 100644
--- a/syncer/service/replicator/resource/resource_test.go
+++ b/syncer/service/replicator/resource/resource_test.go
@@ -3,7 +3,6 @@ package resource
import (
"context"
"errors"
- "strconv"
"testing"
"time"
@@ -185,8 +184,8 @@ func TestNeedOperate(t *testing.T) {
c := &checker{
curNotNil: true,
event: e,
- updateTime: func() string {
- return
strconv.FormatInt(time.Now().Add(-time.Minute).Unix(), 10)
+ updateTime: func() (int64, error) {
+ return time.Now().Add(-time.Minute).Unix(), nil
},
resourceID: "",
}
@@ -194,8 +193,8 @@ func TestNeedOperate(t *testing.T) {
r := c.needOperate(ctx)
assert.Nil(t, r)
- c.updateTime = func() string {
- return
strconv.FormatInt(time.Now().Add(time.Minute).Unix(), 10)
+ c.updateTime = func() (int64, error) {
+ return time.Now().Add(time.Minute).Unix(), nil
}
r = c.needOperate(ctx)
@@ -216,8 +215,8 @@ func TestNeedOperate(t *testing.T) {
c := &checker{
curNotNil: false,
event: e,
- updateTime: func() string {
- return
strconv.FormatInt(time.Now().Add(-time.Minute).Unix(), 10)
+ updateTime: func() (int64, error) {
+ return time.Now().Add(-time.Minute).Unix(), nil
},
resourceID: "",
}
diff --git a/syncer/service/replicator/resource/role.go
b/syncer/service/replicator/resource/role.go
index ea91512..d2a8df9 100644
--- a/syncer/service/replicator/resource/role.go
+++ b/syncer/service/replicator/resource/role.go
@@ -89,8 +89,8 @@ func (r *role) NeedOperate(ctx context.Context) *Result {
checker := &checker{
curNotNil: r.cur != nil,
event: r.event,
- updateTime: func() string {
- return r.cur.UpdateTime
+ updateTime: func() (int64, error) {
+ return formatUpdateTimeSecond(r.cur.UpdateTime)
},
resourceID: r.input.Name,
}
diff --git a/syncer/service/task/manager.go b/syncer/service/task/manager.go
index 5c51c32..e03e7bc 100644
--- a/syncer/service/task/manager.go
+++ b/syncer/service/task/manager.go
@@ -227,10 +227,7 @@ func (m *manager) handleResult(res *event.Result) {
if res.Error != nil || res.Data.Code == resource.Fail {
log.Error(fmt.Sprintf("get task %s result, return error",
res.ID), res.Error)
m.cache.Range(func(key, value interface{}) bool {
- if res.ID == key {
- m.cache.Delete(key)
- return false
- }
+ m.cache.Delete(key)
return true
})
return