This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 860af37  #1228 support kv timestamp (#1254)
860af37 is described below

commit 860af37511cc9bb2be247c72267f80df3b5d50aa
Author: aseTo2016 <[email protected]>
AuthorDate: Thu Feb 10 11:03:24 2022 +0800

    #1228 support kv timestamp (#1254)
    
    * #1228 support kv timestamp
    
    * #1228 support kv timestamp
    
    Co-authored-by: aseTo2016 <tys201193111>
---
 syncer/service/replicator/resource/account.go      |  4 +--
 syncer/service/replicator/resource/instance.go     |  4 +--
 syncer/service/replicator/resource/kv.go           | 31 +++++++++++++++++++++-
 syncer/service/replicator/resource/kv_test.go      | 29 ++++++++++++++++++++
 syncer/service/replicator/resource/microservice.go |  4 +--
 syncer/service/replicator/resource/resource.go     | 17 ++++++++----
 .../service/replicator/resource/resource_test.go   | 13 +++++----
 syncer/service/replicator/resource/role.go         |  4 +--
 syncer/service/task/manager.go                     |  5 +---
 9 files changed, 86 insertions(+), 25 deletions(-)

diff --git a/syncer/service/replicator/resource/account.go 
b/syncer/service/replicator/resource/account.go
index 68b12ad..a36bfcd 100644
--- a/syncer/service/replicator/resource/account.go
+++ b/syncer/service/replicator/resource/account.go
@@ -80,8 +80,8 @@ func (a *account) NeedOperate(ctx context.Context) *Result {
        c := &checker{
                curNotNil: a.cur != nil,
                event:     a.event,
-               updateTime: func() string {
-                       return a.cur.UpdateTime
+               updateTime: func() (int64, error) {
+                       return formatUpdateTimeSecond(a.cur.UpdateTime)
                },
                resourceID: a.input.Name,
        }
diff --git a/syncer/service/replicator/resource/instance.go 
b/syncer/service/replicator/resource/instance.go
index 16fdd04..430b207 100644
--- a/syncer/service/replicator/resource/instance.go
+++ b/syncer/service/replicator/resource/instance.go
@@ -159,8 +159,8 @@ func (i *instance) NeedOperate(ctx context.Context) *Result 
{
        c := &checker{
                curNotNil: i.cur != nil,
                event:     i.event,
-               updateTime: func() string {
-                       return i.cur.ModTimestamp
+               updateTime: func() (int64, error) {
+                       return formatUpdateTimeSecond(i.cur.ModTimestamp)
                },
                resourceID: "",
        }
diff --git a/syncer/service/replicator/resource/kv.go 
b/syncer/service/replicator/resource/kv.go
index bac1f98..b3acb79 100644
--- a/syncer/service/replicator/resource/kv.go
+++ b/syncer/service/replicator/resource/kv.go
@@ -2,9 +2,12 @@ package resource
 
 import (
        "context"
+       "encoding/json"
        "errors"
+       "fmt"
        "sync"
 
+       "github.com/apache/servicecomb-service-center/pkg/log"
        v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
 
        "github.com/little-cui/etcdadpt"
@@ -12,6 +15,8 @@ import (
 
 const (
        KV = "kv"
+
+       ComparableKey = "comparable"
 )
 
 const (
@@ -63,11 +68,35 @@ func (k *kv) LoadCurrentResource(ctx context.Context) 
*Result {
        return nil
 }
 
+type Value struct {
+       Timestamp int64 `json:"$timestamp"`
+}
+
+func (k *kv) getUpdateTime() (int64, error) {
+       if k.cur == nil {
+               return 0, nil
+       }
+
+       comparable, ok := k.event.Opts[ComparableKey]
+       if !ok || comparable != "true" {
+               return 0, nil
+       }
+
+       v := new(Value)
+       err := json.Unmarshal(k.cur, v)
+       if err != nil {
+               log.Warn(fmt.Sprintf("unmarshal kv %s value failed, err %s", 
k.key, err.Error()))
+               return 0, err
+       }
+
+       return v.Timestamp, nil
+}
+
 func (k *kv) NeedOperate(ctx context.Context) *Result {
        c := &checker{
                curNotNil:  k.cur != nil,
                event:      k.event,
-               updateTime: nil,
+               updateTime: k.getUpdateTime,
                resourceID: k.key,
        }
        c.tombstoneLoader = c
diff --git a/syncer/service/replicator/resource/kv_test.go 
b/syncer/service/replicator/resource/kv_test.go
index 34ce8cb..02a6fc0 100644
--- a/syncer/service/replicator/resource/kv_test.go
+++ b/syncer/service/replicator/resource/kv_test.go
@@ -2,6 +2,7 @@ package resource
 
 import (
        "context"
+       "fmt"
        "testing"
        "time"
 
@@ -161,3 +162,31 @@ func (f *mockKVManager) Delete(_ context.Context, key 
string) error {
        delete(f.kvs, key)
        return nil
 }
+
+func Test_kv_getUpdateTime(t *testing.T) {
+       now := time.Now().Unix()
+       data := []byte(fmt.Sprintf(`{"$timestamp": %d}`, now))
+       k := &kv{
+               key: "hello",
+               event: &v1sync.Event{
+                       Opts: map[string]string{
+                               ComparableKey: "true",
+                       },
+               },
+               cur: data,
+       }
+       got, err := k.getUpdateTime()
+       if assert.Nil(t, err) {
+               assert.Equal(t, now, got)
+       }
+
+       k = &kv{
+               key:   "hello",
+               event: &v1sync.Event{},
+               cur:   data,
+       }
+       got, err = k.getUpdateTime()
+       if assert.Nil(t, err) {
+               assert.Equal(t, 0, got)
+       }
+}
diff --git a/syncer/service/replicator/resource/microservice.go 
b/syncer/service/replicator/resource/microservice.go
index 3e17ac1..3e18a15 100644
--- a/syncer/service/replicator/resource/microservice.go
+++ b/syncer/service/replicator/resource/microservice.go
@@ -92,8 +92,8 @@ func (m *microservice) NeedOperate(ctx context.Context) 
*Result {
        c := &checker{
                curNotNil: m.cur != nil,
                event:     m.event,
-               updateTime: func() string {
-                       return m.cur.ModTimestamp
+               updateTime: func() (int64, error) {
+                       return formatUpdateTimeSecond(m.cur.ModTimestamp)
                },
                resourceID: m.serviceID,
        }
diff --git a/syncer/service/replicator/resource/resource.go 
b/syncer/service/replicator/resource/resource.go
index 7acf09b..5087b05 100644
--- a/syncer/service/replicator/resource/resource.go
+++ b/syncer/service/replicator/resource/resource.go
@@ -278,25 +278,32 @@ type checker struct {
        curNotNil bool
 
        event      *v1sync.Event
-       updateTime func() string
+       updateTime func() (int64, error)
        resourceID string
 
        tombstoneLoader tombstoneLoader
 }
 
+func formatUpdateTimeSecond(src string) (int64, error) {
+       updateTime, err := strconv.ParseInt(src, 0, 0)
+       if err != nil {
+               return 0, err
+       }
+
+       return updateTime * 1000 * 1000 * 1000, nil
+}
+
 func (o *checker) needOperate(ctx context.Context) *Result {
        if o.curNotNil {
                if o.updateTime == nil {
                        return nil
                }
 
-               updateTime, err := strconv.ParseInt(o.updateTime(), 0, 0)
+               updateTime, err := o.updateTime()
                if err != nil {
-                       log.Error("parse update time failed", err)
+                       log.Error("get update time failed", err)
                        return FailResult(err)
                }
-
-               updateTime = updateTime * 1000 * 1000 * 1000
                if updateTime >= o.event.Timestamp {
                        return SkipResult()
                }
diff --git a/syncer/service/replicator/resource/resource_test.go 
b/syncer/service/replicator/resource/resource_test.go
index 148dea7..5586c68 100644
--- a/syncer/service/replicator/resource/resource_test.go
+++ b/syncer/service/replicator/resource/resource_test.go
@@ -3,7 +3,6 @@ package resource
 import (
        "context"
        "errors"
-       "strconv"
        "testing"
        "time"
 
@@ -185,8 +184,8 @@ func TestNeedOperate(t *testing.T) {
                c := &checker{
                        curNotNil: true,
                        event:     e,
-                       updateTime: func() string {
-                               return 
strconv.FormatInt(time.Now().Add(-time.Minute).Unix(), 10)
+                       updateTime: func() (int64, error) {
+                               return time.Now().Add(-time.Minute).Unix(), nil
                        },
                        resourceID: "",
                }
@@ -194,8 +193,8 @@ func TestNeedOperate(t *testing.T) {
                r := c.needOperate(ctx)
                assert.Nil(t, r)
 
-               c.updateTime = func() string {
-                       return 
strconv.FormatInt(time.Now().Add(time.Minute).Unix(), 10)
+               c.updateTime = func() (int64, error) {
+                       return time.Now().Add(time.Minute).Unix(), nil
                }
 
                r = c.needOperate(ctx)
@@ -216,8 +215,8 @@ func TestNeedOperate(t *testing.T) {
                c := &checker{
                        curNotNil: false,
                        event:     e,
-                       updateTime: func() string {
-                               return 
strconv.FormatInt(time.Now().Add(-time.Minute).Unix(), 10)
+                       updateTime: func() (int64, error) {
+                               return time.Now().Add(-time.Minute).Unix(), nil
                        },
                        resourceID: "",
                }
diff --git a/syncer/service/replicator/resource/role.go 
b/syncer/service/replicator/resource/role.go
index ea91512..d2a8df9 100644
--- a/syncer/service/replicator/resource/role.go
+++ b/syncer/service/replicator/resource/role.go
@@ -89,8 +89,8 @@ func (r *role) NeedOperate(ctx context.Context) *Result {
        checker := &checker{
                curNotNil: r.cur != nil,
                event:     r.event,
-               updateTime: func() string {
-                       return r.cur.UpdateTime
+               updateTime: func() (int64, error) {
+                       return formatUpdateTimeSecond(r.cur.UpdateTime)
                },
                resourceID: r.input.Name,
        }
diff --git a/syncer/service/task/manager.go b/syncer/service/task/manager.go
index 5c51c32..e03e7bc 100644
--- a/syncer/service/task/manager.go
+++ b/syncer/service/task/manager.go
@@ -227,10 +227,7 @@ func (m *manager) handleResult(res *event.Result) {
        if res.Error != nil || res.Data.Code == resource.Fail {
                log.Error(fmt.Sprintf("get task %s result, return error", 
res.ID), res.Error)
                m.cache.Range(func(key, value interface{}) bool {
-                       if res.ID == key {
-                               m.cache.Delete(key)
-                               return false
-                       }
+                       m.cache.Delete(key)
                        return true
                })
                return

Reply via email to