This is an automated email from the ASF dual-hosted git repository.

humingcheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/dev by this push:
     new f8ffed7f sync again when reentry SyncAll (#1504)
f8ffed7f is described below

commit f8ffed7f41b3b4f8b353626b54e9cc794621f824
Author: Wanghb1 <[email protected]>
AuthorDate: Thu Mar 20 23:16:57 2025 +0800

    sync again when reentry SyncAll (#1504)
    
    sync again when reentry SyncAll
---
 datasource/etcd/sync.go      |  87 +++++++++++++++++--
 datasource/etcd/sync_test.go | 197 ++++++++++++++++++++++++++++++++++++++++---
 datasource/manager.go        |   1 +
 3 files changed, 266 insertions(+), 19 deletions(-)

diff --git a/datasource/etcd/sync.go b/datasource/etcd/sync.go
index 75c20f04..59bcb438 100644
--- a/datasource/etcd/sync.go
+++ b/datasource/etcd/sync.go
@@ -34,11 +34,13 @@ import (
        "github.com/apache/servicecomb-service-center/pkg/log"
        putil "github.com/apache/servicecomb-service-center/pkg/util"
        "github.com/apache/servicecomb-service-center/server/config"
+       syncconfig "github.com/apache/servicecomb-service-center/syncer/config"
 )
 
 const (
-       SyncAllKey     = "/cse-sr/sync-all"
-       SyncAllLockKey = "/cse-sr/sync-all-lock"
+       SyncAllKey      = "/cse-sr/sync-all"
+       SyncAllPeersKey = "/cse-sr/sync-all/peers"
+       SyncAllLockKey  = "/cse-sr/sync-all-lock"
 
        // one minutes
        defaultLockTime = 60
@@ -55,17 +57,31 @@ type SyncManager struct {
 func (s *SyncManager) SyncAll(ctx context.Context) error {
        enable := config.GetBool("sync.enableOnStart", false)
        if !enable {
-               return nil
+               return s.clearSyncHistory(ctx)
        }
+       peers := strings.Join(syncconfig.GetConfig().Sync.Peers[0].Endpoints, 
",")
        ctx = putil.SetContext(ctx, putil.CtxEnableSync, "1")
        exist, err := etcdadpt.Exist(ctx, SyncAllKey)
        if err != nil {
                return err
        }
-       if exist {
-               log.Info(fmt.Sprintf("%s key already exists, do not need to do 
tasks", SyncAllKey))
-               return datasource.ErrSyncAllKeyExists
+       if !exist {
+               return s.syncAll(ctx, peers)
+       }
+       kv, err := etcdadpt.Get(ctx, SyncAllPeersKey)
+       if err != nil {
+               return err
+       }
+       if kv == nil {
+               return etcdadpt.Put(ctx, SyncAllPeersKey, peers)
+       }
+       if s.peerEqual(peers, string(kv.Value)) {
+               return nil
        }
+       return s.syncAll(ctx, peers)
+}
+
+func (s *SyncManager) syncAll(ctx context.Context, peer string) error {
        lock, err := etcdadpt.TryLock(SyncAllLockKey, defaultLockTime)
        if err != nil || lock == nil {
                log.Info(fmt.Sprintf("%s lock not acquired", SyncAllLockKey))
@@ -101,7 +117,41 @@ func (s *SyncManager) SyncAll(ctx context.Context) error {
        if err != nil {
                return err
        }
-       return etcdadpt.Put(ctx, SyncAllKey, "1")
+       opts := []etcdadpt.OpOptions{
+               etcdadpt.OpPut(etcdadpt.WithStrKey(SyncAllKey), 
etcdadpt.WithValue([]byte("1"))),
+               etcdadpt.OpPut(etcdadpt.WithStrKey(SyncAllPeersKey),
+                       etcdadpt.WithValue([]byte(peer))),
+       }
+       return etcdadpt.Txn(ctx, opts)
+}
+
+func (s *SyncManager) clearSyncHistory(ctx context.Context) error {
+       existAllKey, err := etcdadpt.Exist(ctx, SyncAllKey)
+       if err != nil {
+               log.Error("get sync all key failed", err)
+               return err
+       }
+       if existAllKey {
+               _, err = etcdadpt.Delete(ctx, SyncAllKey)
+               if err != nil {
+                       log.Error("clear sync all key failed", err)
+                       return err
+               }
+       }
+       existPeerKey, err := etcdadpt.Exist(ctx, SyncAllPeersKey)
+       if err != nil {
+               log.Error("get sync all peers key failed", err)
+               return err
+       }
+       if existPeerKey {
+               _, err = etcdadpt.Delete(ctx, SyncAllPeersKey)
+               if err != nil {
+                       log.Error("clear sync all peers key failed", err)
+                       return err
+               }
+       }
+       log.Info("finish clear sync all history")
+       return nil
 }
 
 func syncAllAccounts(ctx context.Context) error {
@@ -406,3 +456,26 @@ func getDomainProject(key string, prefixKey string) 
(domain string, project stri
        project = splitStr[1]
        return
 }
+
+func (s *SyncManager) peerEqual(peers1, peers2 string) bool {
+       p1 := strings.Split(peers1, ",")
+       m1 := make(map[string]struct{}, len(p1))
+       for _, ep := range p1 {
+               m1[ep] = struct{}{}
+       }
+       p2 := strings.Split(peers2, ",")
+       m2 := make(map[string]struct{}, len(p2))
+       for _, ep := range p2 {
+               m2[ep] = struct{}{}
+       }
+       if len(m1) != len(m2) {
+               return false
+       }
+       for k := range m2 {
+               _, ok := m1[k]
+               if !ok {
+                       return false
+               }
+       }
+       return true
+}
diff --git a/datasource/etcd/sync_test.go b/datasource/etcd/sync_test.go
index 7646797c..b02c8443 100644
--- a/datasource/etcd/sync_test.go
+++ b/datasource/etcd/sync_test.go
@@ -29,16 +29,15 @@ import (
        "github.com/go-chassis/go-archaius"
        "github.com/stretchr/testify/assert"
 
-       "github.com/apache/servicecomb-service-center/eventbase/model"
-       "github.com/apache/servicecomb-service-center/eventbase/service/task"
-       
"github.com/apache/servicecomb-service-center/eventbase/service/tombstone"
-
        "github.com/apache/servicecomb-service-center/datasource"
        "github.com/apache/servicecomb-service-center/datasource/etcd"
        "github.com/apache/servicecomb-service-center/datasource/rbac"
        "github.com/apache/servicecomb-service-center/datasource/schema"
+       "github.com/apache/servicecomb-service-center/eventbase/model"
+       "github.com/apache/servicecomb-service-center/eventbase/service/task"
+       
"github.com/apache/servicecomb-service-center/eventbase/service/tombstone"
        "github.com/apache/servicecomb-service-center/pkg/util"
-       _ "github.com/apache/servicecomb-service-center/test"
+       "github.com/apache/servicecomb-service-center/syncer/config"
 )
 
 func syncAllContext() context.Context {
@@ -47,27 +46,69 @@ func syncAllContext() context.Context {
 }
 
 func TestSyncAll(t *testing.T) {
+
        t.Run("enableOnStart is false will not do sync", func(t *testing.T) {
                _ = archaius.Set("sync.enableOnStart", false)
                err := datasource.GetSyncManager().SyncAll(syncAllContext())
                assert.Nil(t, err)
        })
 
-       t.Run("enableOnStart is true and syncAllKey exists will not do sync", 
func(t *testing.T) {
+       t.Run("enableOnstart is true and syncAllKey not exists but 
SyncAllLockKey is lock will not do sync", func(t *testing.T) {
+               _ = archaius.Set("sync.enableOnStart", true)
+               config.SetConfig(config.Config{Sync: &config.Sync{
+                       Peers: []*config.Peer{{Endpoints: 
[]string{"127.0.0.1:30105", "127.0.0.2:30105"}}}}})
+               lock, err := etcdadpt.TryLock(etcd.SyncAllLockKey, 600)
+               assert.Nil(t, err)
+               err = datasource.GetSyncManager().SyncAll(syncAllContext())
+               assert.Nil(t, err)
+               listTaskReq := model.ListTaskRequest{
+                       Domain:  "sync-all",
+                       Project: "sync-all",
+               }
+               tasks, err := task.List(syncAllContext(), &listTaskReq)
+               assert.NoError(t, err)
+               assert.Equal(t, 0, len(tasks))
+               err = lock.Unlock()
+               assert.Nil(t, err)
+       })
+
+       t.Run("enableOnStart is true and syncAllKey exists and syncAllPeersKey 
not exist will put key but not sync", func(t *testing.T) {
                _ = archaius.Set("sync.enableOnStart", true)
+               config.SetConfig(config.Config{Sync: &config.Sync{
+                       Peers: []*config.Peer{{Endpoints: 
[]string{"127.0.0.1:30105", "127.0.0.2:30105"}}}}})
                err := etcdadpt.Put(syncAllContext(), etcd.SyncAllKey, "1")
                assert.Nil(t, err)
+               kv, err := etcdadpt.Get(syncAllContext(), etcd.SyncAllPeersKey)
+               assert.Nil(t, err)
+               assert.Nil(t, kv)
+
                err = datasource.GetSyncManager().SyncAll(syncAllContext())
-               assert.Equal(t, datasource.ErrSyncAllKeyExists, err)
-               isDeleted, err := etcdadpt.Delete(syncAllContext(), 
etcd.SyncAllKey)
-               assert.Equal(t, isDeleted, true)
+               assert.Nil(t, err)
+               listTaskReq := model.ListTaskRequest{
+                       Domain:  "sync-all",
+                       Project: "sync-all",
+               }
+               tasks, err := task.List(syncAllContext(), &listTaskReq)
+               assert.NoError(t, err)
+               assert.Equal(t, 0, len(tasks))
+               peers, err := etcdadpt.Get(syncAllContext(), 
etcd.SyncAllPeersKey)
+               assert.NoError(t, err)
+               assert.Equal(t, "127.0.0.1:30105,127.0.0.2:30105", 
string(peers.Value))
+               _, err = etcdadpt.Delete(syncAllContext(), etcd.SyncAllPeersKey)
+               assert.Nil(t, err)
+               _, err = etcdadpt.Delete(syncAllContext(), etcd.SyncAllKey)
                assert.Nil(t, err)
        })
 
-       t.Run("enableOnstart is true and syncAllKey not exists but 
SyncAllLockKey is lock will not do sync", func(t *testing.T) {
+       t.Run("enableOnStart is true and syncAllKey exists and syncAllPeersKey 
map current peers will not do sync", func(t *testing.T) {
                _ = archaius.Set("sync.enableOnStart", true)
-               lock, err := etcdadpt.TryLock(etcd.SyncAllLockKey, 600)
+               config.SetConfig(config.Config{Sync: &config.Sync{
+                       Peers: []*config.Peer{{Endpoints: 
[]string{"127.0.0.1:30105", "127.0.0.2:30105"}}}}})
+               err := etcdadpt.Put(syncAllContext(), etcd.SyncAllKey, "1")
                assert.Nil(t, err)
+               err = etcdadpt.Put(syncAllContext(), etcd.SyncAllPeersKey, 
"127.0.0.1:30105,127.0.0.2:30105")
+               assert.NoError(t, err)
+
                err = datasource.GetSyncManager().SyncAll(syncAllContext())
                assert.Nil(t, err)
                listTaskReq := model.ListTaskRequest{
@@ -77,17 +118,33 @@ func TestSyncAll(t *testing.T) {
                tasks, err := task.List(syncAllContext(), &listTaskReq)
                assert.NoError(t, err)
                assert.Equal(t, 0, len(tasks))
-               err = lock.Unlock()
+               peers, err := etcdadpt.Get(syncAllContext(), 
etcd.SyncAllPeersKey)
+               assert.NoError(t, err)
+               assert.Equal(t, "127.0.0.1:30105,127.0.0.2:30105", 
string(peers.Value))
+
+               _, err = etcdadpt.Delete(syncAllContext(), etcd.SyncAllPeersKey)
+               assert.Nil(t, err)
+               _, err = etcdadpt.Delete(syncAllContext(), etcd.SyncAllKey)
                assert.Nil(t, err)
        })
 
        t.Run("enableOnStart is true and syncAllKey not exists will do sync", 
func(t *testing.T) {
                _ = archaius.Set("sync.enableOnStart", true)
+               config.SetConfig(config.Config{Sync: &config.Sync{
+                       Peers: []*config.Peer{{Endpoints: 
[]string{"127.0.0.1:30105", "127.0.0.2:30105"}}}}})
                var serviceID string
                var accountName string
                var roleName string
                var consumerID string
                var providerID string
+
+               exist, err := etcdadpt.Exist(syncAllContext(), etcd.SyncAllKey)
+               assert.False(t, exist)
+               assert.NoError(t, err)
+               peers, err := etcdadpt.Get(syncAllContext(), 
etcd.SyncAllPeersKey)
+               assert.Nil(t, peers)
+               assert.NoError(t, err)
+
                t.Run("register a service and delete the task should pass", 
func(t *testing.T) {
                        resp, err := 
datasource.GetMetadataManager().RegisterService(syncAllContext(), 
&pb.CreateServiceRequest{
                                Service: &pb.MicroService{
@@ -414,6 +471,14 @@ func TestSyncAll(t *testing.T) {
                        isDelete, err := etcdadpt.Delete(syncAllContext(), 
etcd.SyncAllKey)
                        assert.Equal(t, true, isDelete)
                        assert.Nil(t, err)
+
+                       peers, err := etcdadpt.Get(syncAllContext(), 
etcd.SyncAllPeersKey)
+                       assert.NoError(t, err)
+                       assert.Equal(t, "127.0.0.1:30105,127.0.0.2:30105", 
string(peers.Value))
+
+                       isDelete, err = etcdadpt.Delete(syncAllContext(), 
etcd.SyncAllPeersKey)
+                       assert.Equal(t, true, isDelete)
+                       assert.Nil(t, err)
                })
 
                t.Run("delete all resources should pass", func(t *testing.T) {
@@ -512,8 +577,116 @@ func TestSyncAll(t *testing.T) {
                })
        })
 
+       t.Run("enableOnStart is true and syncAllKey exists and syncAllPeersKey 
do not map current peers will do sync", func(t *testing.T) {
+               _ = archaius.Set("sync.enableOnStart", true)
+               config.SetConfig(config.Config{Sync: &config.Sync{
+                       Peers: []*config.Peer{{Endpoints: 
[]string{"127.0.0.3:30105", "127.0.0.4:30105"}}}}})
+               err := etcdadpt.Put(syncAllContext(), etcd.SyncAllKey, "1")
+               assert.Nil(t, err)
+               err = etcdadpt.Put(syncAllContext(), etcd.SyncAllPeersKey, 
"127.0.0.1:30105,127.0.0.2:30105")
+               assert.NoError(t, err)
+
+               var accountName string
+
+               t.Run("create a account and delete the task should pass", 
func(t *testing.T) {
+                       a1 := crbac.Account{
+                               ID:                  
"sync-create-11111-sync-all",
+                               Name:                
"sync-create-account1-sync-all",
+                               Password:            "tnuocca-tset",
+                               Roles:               []string{"admin"},
+                               TokenExpirationTime: "2020-12-30",
+                               CurrentPassword:     "tnuocca-tset1",
+                       }
+                       err := rbac.Instance().CreateAccount(syncAllContext(), 
&a1)
+                       assert.NoError(t, err)
+                       accountName = a1.Name
+                       r, err := rbac.Instance().GetAccount(syncAllContext(), 
a1.Name)
+                       assert.NoError(t, err)
+                       assert.Equal(t, a1, *r)
+                       listTaskReq := model.ListTaskRequest{
+                               Domain:       "sync-all",
+                               Project:      "sync-all",
+                               ResourceType: datasource.ResourceAccount,
+                       }
+                       tasks, err := task.List(syncAllContext(), &listTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 1, len(tasks))
+                       err = task.Delete(syncAllContext(), tasks...)
+                       assert.NoError(t, err)
+                       tasks, err = task.List(syncAllContext(), &listTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 0, len(tasks))
+               })
+
+               t.Run("do sync will create task should pass", func(t 
*testing.T) {
+                       err := 
datasource.GetSyncManager().SyncAll(syncAllContext())
+                       assert.Nil(t, err)
+
+                       listAccountTaskReq := model.ListTaskRequest{
+                               Domain:       "",
+                               Project:      "",
+                               ResourceType: datasource.ResourceAccount,
+                       }
+                       tasks, err := task.List(syncAllContext(), 
&listAccountTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 1, len(tasks))
+                       err = task.Delete(syncAllContext(), tasks...)
+                       assert.NoError(t, err)
+                       tasks, err = task.List(syncAllContext(), 
&listAccountTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 0, len(tasks))
+
+                       exist, err := etcdadpt.Exist(syncAllContext(), 
etcd.SyncAllKey)
+                       assert.Equal(t, true, exist)
+                       assert.Nil(t, err)
+
+                       isDelete, err := etcdadpt.Delete(syncAllContext(), 
etcd.SyncAllKey)
+                       assert.Equal(t, true, isDelete)
+                       assert.Nil(t, err)
+
+                       peers, err := etcdadpt.Get(syncAllContext(), 
etcd.SyncAllPeersKey)
+                       assert.NoError(t, err)
+                       assert.Equal(t, "127.0.0.3:30105,127.0.0.4:30105", 
string(peers.Value))
+
+                       isDelete, err = etcdadpt.Delete(syncAllContext(), 
etcd.SyncAllPeersKey)
+                       assert.Equal(t, true, isDelete)
+                       assert.Nil(t, err)
+               })
+
+               t.Run("delete all resources should pass", func(t *testing.T) {
+                       _, err = 
rbac.Instance().DeleteAccount(syncAllContext(), []string{accountName})
+                       assert.NoError(t, err)
+
+                       listAccountTaskReq := model.ListTaskRequest{
+                               Domain:       "sync-all",
+                               Project:      "sync-all",
+                               ResourceType: datasource.ResourceAccount,
+                       }
+                       tasks, err := task.List(syncAllContext(), 
&listAccountTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 1, len(tasks))
+                       err = task.Delete(syncAllContext(), tasks...)
+                       assert.NoError(t, err)
+                       tasks, err = task.List(syncAllContext(), 
&listAccountTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 0, len(tasks))
+
+                       tombstoneListReq := model.ListTombstoneRequest{
+                               Domain:  "sync-all",
+                               Project: "sync-all",
+                       }
+                       tombstones, err := tombstone.List(syncAllContext(), 
&tombstoneListReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 1, len(tombstones))
+                       err = tombstone.Delete(syncAllContext(), tombstones...)
+                       assert.NoError(t, err)
+               })
+       })
+
        t.Run("enableOnStart is true ,syncAllKey not exists and context is 
context.Background() will do sync", func(t *testing.T) {
                _ = archaius.Set("sync.enableOnStart", true)
+               config.SetConfig(config.Config{Sync: &config.Sync{
+                       Peers: []*config.Peer{{Endpoints: 
[]string{"127.0.0.1:30105", "127.0.0.2:30105"}}}}})
                var accountName string
                ctx := 
util.WithNoCache(util.SetDomainProject(context.Background(), 
"sync-all-background", "sync-all-background"))
                ctx = util.WithNoCache(util.SetContext(ctx, util.CtxEnableSync, 
"1"))
diff --git a/datasource/manager.go b/datasource/manager.go
index 9ede3fa9..9f1e68a8 100644
--- a/datasource/manager.go
+++ b/datasource/manager.go
@@ -54,6 +54,7 @@ func Init(opts Options) error {
        }
        err = GetSyncManager().SyncAll(context.Background())
        if err != nil && err != ErrSyncAllKeyExists {
+               log.Fatal("sync all failed", err)
                return err
        }
        err = schema.Init(schema.Options{Kind: opts.Kind})

Reply via email to