This is an automated email from the ASF dual-hosted git repository.
humingcheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/dev by this push:
new f8ffed7f sync again when reentry SyncAll (#1504)
f8ffed7f is described below
commit f8ffed7f41b3b4f8b353626b54e9cc794621f824
Author: Wanghb1 <[email protected]>
AuthorDate: Thu Mar 20 23:16:57 2025 +0800
sync again when reentry SyncAll (#1504)
sync again when reentry SyncAll
---
datasource/etcd/sync.go | 87 +++++++++++++++++--
datasource/etcd/sync_test.go | 197 ++++++++++++++++++++++++++++++++++++++++---
datasource/manager.go | 1 +
3 files changed, 266 insertions(+), 19 deletions(-)
diff --git a/datasource/etcd/sync.go b/datasource/etcd/sync.go
index 75c20f04..59bcb438 100644
--- a/datasource/etcd/sync.go
+++ b/datasource/etcd/sync.go
@@ -34,11 +34,13 @@ import (
"github.com/apache/servicecomb-service-center/pkg/log"
putil "github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/config"
+ syncconfig "github.com/apache/servicecomb-service-center/syncer/config"
)
const (
- SyncAllKey = "/cse-sr/sync-all"
- SyncAllLockKey = "/cse-sr/sync-all-lock"
+ SyncAllKey = "/cse-sr/sync-all"
+ SyncAllPeersKey = "/cse-sr/sync-all/peers"
+ SyncAllLockKey = "/cse-sr/sync-all-lock"
// one minutes
defaultLockTime = 60
@@ -55,17 +57,31 @@ type SyncManager struct {
func (s *SyncManager) SyncAll(ctx context.Context) error {
enable := config.GetBool("sync.enableOnStart", false)
if !enable {
- return nil
+ return s.clearSyncHistory(ctx)
}
+ peers := strings.Join(syncconfig.GetConfig().Sync.Peers[0].Endpoints,
",")
ctx = putil.SetContext(ctx, putil.CtxEnableSync, "1")
exist, err := etcdadpt.Exist(ctx, SyncAllKey)
if err != nil {
return err
}
- if exist {
- log.Info(fmt.Sprintf("%s key already exists, do not need to do
tasks", SyncAllKey))
- return datasource.ErrSyncAllKeyExists
+ if !exist {
+ return s.syncAll(ctx, peers)
+ }
+ kv, err := etcdadpt.Get(ctx, SyncAllPeersKey)
+ if err != nil {
+ return err
+ }
+ if kv == nil {
+ return etcdadpt.Put(ctx, SyncAllPeersKey, peers)
+ }
+ if s.peerEqual(peers, string(kv.Value)) {
+ return nil
}
+ return s.syncAll(ctx, peers)
+}
+
+func (s *SyncManager) syncAll(ctx context.Context, peer string) error {
lock, err := etcdadpt.TryLock(SyncAllLockKey, defaultLockTime)
if err != nil || lock == nil {
log.Info(fmt.Sprintf("%s lock not acquired", SyncAllLockKey))
@@ -101,7 +117,41 @@ func (s *SyncManager) SyncAll(ctx context.Context) error {
if err != nil {
return err
}
- return etcdadpt.Put(ctx, SyncAllKey, "1")
+ opts := []etcdadpt.OpOptions{
+ etcdadpt.OpPut(etcdadpt.WithStrKey(SyncAllKey),
etcdadpt.WithValue([]byte("1"))),
+ etcdadpt.OpPut(etcdadpt.WithStrKey(SyncAllPeersKey),
+ etcdadpt.WithValue([]byte(peer))),
+ }
+ return etcdadpt.Txn(ctx, opts)
+}
+
+func (s *SyncManager) clearSyncHistory(ctx context.Context) error {
+ existAllKey, err := etcdadpt.Exist(ctx, SyncAllKey)
+ if err != nil {
+ log.Error("get sync all key failed", err)
+ return err
+ }
+ if existAllKey {
+ _, err = etcdadpt.Delete(ctx, SyncAllKey)
+ if err != nil {
+ log.Error("clear sync all key failed", err)
+ return err
+ }
+ }
+ existPeerKey, err := etcdadpt.Exist(ctx, SyncAllPeersKey)
+ if err != nil {
+ log.Error("get sync all peers key failed", err)
+ return err
+ }
+ if existPeerKey {
+ _, err = etcdadpt.Delete(ctx, SyncAllPeersKey)
+ if err != nil {
+ log.Error("clear sync all peers key failed", err)
+ return err
+ }
+ }
+ log.Info("finish clear sync all history")
+ return nil
}
func syncAllAccounts(ctx context.Context) error {
@@ -406,3 +456,26 @@ func getDomainProject(key string, prefixKey string)
(domain string, project stri
project = splitStr[1]
return
}
+
+func (s *SyncManager) peerEqual(peers1, peers2 string) bool {
+ p1 := strings.Split(peers1, ",")
+ m1 := make(map[string]struct{}, len(p1))
+ for _, ep := range p1 {
+ m1[ep] = struct{}{}
+ }
+ p2 := strings.Split(peers2, ",")
+ m2 := make(map[string]struct{}, len(p2))
+ for _, ep := range p2 {
+ m2[ep] = struct{}{}
+ }
+ if len(m1) != len(m2) {
+ return false
+ }
+ for k := range m2 {
+ _, ok := m1[k]
+ if !ok {
+ return false
+ }
+ }
+ return true
+}
diff --git a/datasource/etcd/sync_test.go b/datasource/etcd/sync_test.go
index 7646797c..b02c8443 100644
--- a/datasource/etcd/sync_test.go
+++ b/datasource/etcd/sync_test.go
@@ -29,16 +29,15 @@ import (
"github.com/go-chassis/go-archaius"
"github.com/stretchr/testify/assert"
- "github.com/apache/servicecomb-service-center/eventbase/model"
- "github.com/apache/servicecomb-service-center/eventbase/service/task"
-
"github.com/apache/servicecomb-service-center/eventbase/service/tombstone"
-
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/etcd"
"github.com/apache/servicecomb-service-center/datasource/rbac"
"github.com/apache/servicecomb-service-center/datasource/schema"
+ "github.com/apache/servicecomb-service-center/eventbase/model"
+ "github.com/apache/servicecomb-service-center/eventbase/service/task"
+
"github.com/apache/servicecomb-service-center/eventbase/service/tombstone"
"github.com/apache/servicecomb-service-center/pkg/util"
- _ "github.com/apache/servicecomb-service-center/test"
+ "github.com/apache/servicecomb-service-center/syncer/config"
)
func syncAllContext() context.Context {
@@ -47,27 +46,69 @@ func syncAllContext() context.Context {
}
func TestSyncAll(t *testing.T) {
+
t.Run("enableOnStart is false will not do sync", func(t *testing.T) {
_ = archaius.Set("sync.enableOnStart", false)
err := datasource.GetSyncManager().SyncAll(syncAllContext())
assert.Nil(t, err)
})
- t.Run("enableOnStart is true and syncAllKey exists will not do sync",
func(t *testing.T) {
+ t.Run("enableOnstart is true and syncAllKey not exists but
SyncAllLockKey is lock will not do sync", func(t *testing.T) {
+ _ = archaius.Set("sync.enableOnStart", true)
+ config.SetConfig(config.Config{Sync: &config.Sync{
+ Peers: []*config.Peer{{Endpoints:
[]string{"127.0.0.1:30105", "127.0.0.2:30105"}}}}})
+ lock, err := etcdadpt.TryLock(etcd.SyncAllLockKey, 600)
+ assert.Nil(t, err)
+ err = datasource.GetSyncManager().SyncAll(syncAllContext())
+ assert.Nil(t, err)
+ listTaskReq := model.ListTaskRequest{
+ Domain: "sync-all",
+ Project: "sync-all",
+ }
+ tasks, err := task.List(syncAllContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+ err = lock.Unlock()
+ assert.Nil(t, err)
+ })
+
+ t.Run("enableOnStart is true and syncAllKey exists and syncAllPeersKey
not exist will put key but not sync", func(t *testing.T) {
_ = archaius.Set("sync.enableOnStart", true)
+ config.SetConfig(config.Config{Sync: &config.Sync{
+ Peers: []*config.Peer{{Endpoints:
[]string{"127.0.0.1:30105", "127.0.0.2:30105"}}}}})
err := etcdadpt.Put(syncAllContext(), etcd.SyncAllKey, "1")
assert.Nil(t, err)
+ kv, err := etcdadpt.Get(syncAllContext(), etcd.SyncAllPeersKey)
+ assert.Nil(t, err)
+ assert.Nil(t, kv)
+
err = datasource.GetSyncManager().SyncAll(syncAllContext())
- assert.Equal(t, datasource.ErrSyncAllKeyExists, err)
- isDeleted, err := etcdadpt.Delete(syncAllContext(),
etcd.SyncAllKey)
- assert.Equal(t, isDeleted, true)
+ assert.Nil(t, err)
+ listTaskReq := model.ListTaskRequest{
+ Domain: "sync-all",
+ Project: "sync-all",
+ }
+ tasks, err := task.List(syncAllContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+ peers, err := etcdadpt.Get(syncAllContext(),
etcd.SyncAllPeersKey)
+ assert.NoError(t, err)
+ assert.Equal(t, "127.0.0.1:30105,127.0.0.2:30105",
string(peers.Value))
+ _, err = etcdadpt.Delete(syncAllContext(), etcd.SyncAllPeersKey)
+ assert.Nil(t, err)
+ _, err = etcdadpt.Delete(syncAllContext(), etcd.SyncAllKey)
assert.Nil(t, err)
})
- t.Run("enableOnstart is true and syncAllKey not exists but
SyncAllLockKey is lock will not do sync", func(t *testing.T) {
+ t.Run("enableOnStart is true and syncAllKey exists and syncAllPeersKey
map current peers will not do sync", func(t *testing.T) {
_ = archaius.Set("sync.enableOnStart", true)
- lock, err := etcdadpt.TryLock(etcd.SyncAllLockKey, 600)
+ config.SetConfig(config.Config{Sync: &config.Sync{
+ Peers: []*config.Peer{{Endpoints:
[]string{"127.0.0.1:30105", "127.0.0.2:30105"}}}}})
+ err := etcdadpt.Put(syncAllContext(), etcd.SyncAllKey, "1")
assert.Nil(t, err)
+ err = etcdadpt.Put(syncAllContext(), etcd.SyncAllPeersKey,
"127.0.0.1:30105,127.0.0.2:30105")
+ assert.NoError(t, err)
+
err = datasource.GetSyncManager().SyncAll(syncAllContext())
assert.Nil(t, err)
listTaskReq := model.ListTaskRequest{
@@ -77,17 +118,33 @@ func TestSyncAll(t *testing.T) {
tasks, err := task.List(syncAllContext(), &listTaskReq)
assert.NoError(t, err)
assert.Equal(t, 0, len(tasks))
- err = lock.Unlock()
+ peers, err := etcdadpt.Get(syncAllContext(),
etcd.SyncAllPeersKey)
+ assert.NoError(t, err)
+ assert.Equal(t, "127.0.0.1:30105,127.0.0.2:30105",
string(peers.Value))
+
+ _, err = etcdadpt.Delete(syncAllContext(), etcd.SyncAllPeersKey)
+ assert.Nil(t, err)
+ _, err = etcdadpt.Delete(syncAllContext(), etcd.SyncAllKey)
assert.Nil(t, err)
})
t.Run("enableOnStart is true and syncAllKey not exists will do sync",
func(t *testing.T) {
_ = archaius.Set("sync.enableOnStart", true)
+ config.SetConfig(config.Config{Sync: &config.Sync{
+ Peers: []*config.Peer{{Endpoints:
[]string{"127.0.0.1:30105", "127.0.0.2:30105"}}}}})
var serviceID string
var accountName string
var roleName string
var consumerID string
var providerID string
+
+ exist, err := etcdadpt.Exist(syncAllContext(), etcd.SyncAllKey)
+ assert.False(t, exist)
+ assert.NoError(t, err)
+ peers, err := etcdadpt.Get(syncAllContext(),
etcd.SyncAllPeersKey)
+ assert.Nil(t, peers)
+ assert.NoError(t, err)
+
t.Run("register a service and delete the task should pass",
func(t *testing.T) {
resp, err :=
datasource.GetMetadataManager().RegisterService(syncAllContext(),
&pb.CreateServiceRequest{
Service: &pb.MicroService{
@@ -414,6 +471,14 @@ func TestSyncAll(t *testing.T) {
isDelete, err := etcdadpt.Delete(syncAllContext(),
etcd.SyncAllKey)
assert.Equal(t, true, isDelete)
assert.Nil(t, err)
+
+ peers, err := etcdadpt.Get(syncAllContext(),
etcd.SyncAllPeersKey)
+ assert.NoError(t, err)
+ assert.Equal(t, "127.0.0.1:30105,127.0.0.2:30105",
string(peers.Value))
+
+ isDelete, err = etcdadpt.Delete(syncAllContext(),
etcd.SyncAllPeersKey)
+ assert.Equal(t, true, isDelete)
+ assert.Nil(t, err)
})
t.Run("delete all resources should pass", func(t *testing.T) {
@@ -512,8 +577,116 @@ func TestSyncAll(t *testing.T) {
})
})
+ t.Run("enableOnStart is true and syncAllKey exists and syncAllPeersKey
do not map current peers will do sync", func(t *testing.T) {
+ _ = archaius.Set("sync.enableOnStart", true)
+ config.SetConfig(config.Config{Sync: &config.Sync{
+ Peers: []*config.Peer{{Endpoints:
[]string{"127.0.0.3:30105", "127.0.0.4:30105"}}}}})
+ err := etcdadpt.Put(syncAllContext(), etcd.SyncAllKey, "1")
+ assert.Nil(t, err)
+ err = etcdadpt.Put(syncAllContext(), etcd.SyncAllPeersKey,
"127.0.0.1:30105,127.0.0.2:30105")
+ assert.NoError(t, err)
+
+ var accountName string
+
+ t.Run("create a account and delete the task should pass",
func(t *testing.T) {
+ a1 := crbac.Account{
+ ID:
"sync-create-11111-sync-all",
+ Name:
"sync-create-account1-sync-all",
+ Password: "tnuocca-tset",
+ Roles: []string{"admin"},
+ TokenExpirationTime: "2020-12-30",
+ CurrentPassword: "tnuocca-tset1",
+ }
+ err := rbac.Instance().CreateAccount(syncAllContext(),
&a1)
+ assert.NoError(t, err)
+ accountName = a1.Name
+ r, err := rbac.Instance().GetAccount(syncAllContext(),
a1.Name)
+ assert.NoError(t, err)
+ assert.Equal(t, a1, *r)
+ listTaskReq := model.ListTaskRequest{
+ Domain: "sync-all",
+ Project: "sync-all",
+ ResourceType: datasource.ResourceAccount,
+ }
+ tasks, err := task.List(syncAllContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ tasks, err = task.List(syncAllContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+ })
+
+ t.Run("do sync will create task should pass", func(t
*testing.T) {
+ err :=
datasource.GetSyncManager().SyncAll(syncAllContext())
+ assert.Nil(t, err)
+
+ listAccountTaskReq := model.ListTaskRequest{
+ Domain: "",
+ Project: "",
+ ResourceType: datasource.ResourceAccount,
+ }
+ tasks, err := task.List(syncAllContext(),
&listAccountTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ tasks, err = task.List(syncAllContext(),
&listAccountTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+
+ exist, err := etcdadpt.Exist(syncAllContext(),
etcd.SyncAllKey)
+ assert.Equal(t, true, exist)
+ assert.Nil(t, err)
+
+ isDelete, err := etcdadpt.Delete(syncAllContext(),
etcd.SyncAllKey)
+ assert.Equal(t, true, isDelete)
+ assert.Nil(t, err)
+
+ peers, err := etcdadpt.Get(syncAllContext(),
etcd.SyncAllPeersKey)
+ assert.NoError(t, err)
+ assert.Equal(t, "127.0.0.3:30105,127.0.0.4:30105",
string(peers.Value))
+
+ isDelete, err = etcdadpt.Delete(syncAllContext(),
etcd.SyncAllPeersKey)
+ assert.Equal(t, true, isDelete)
+ assert.Nil(t, err)
+ })
+
+ t.Run("delete all resources should pass", func(t *testing.T) {
+ _, err =
rbac.Instance().DeleteAccount(syncAllContext(), []string{accountName})
+ assert.NoError(t, err)
+
+ listAccountTaskReq := model.ListTaskRequest{
+ Domain: "sync-all",
+ Project: "sync-all",
+ ResourceType: datasource.ResourceAccount,
+ }
+ tasks, err := task.List(syncAllContext(),
&listAccountTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ tasks, err = task.List(syncAllContext(),
&listAccountTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+
+ tombstoneListReq := model.ListTombstoneRequest{
+ Domain: "sync-all",
+ Project: "sync-all",
+ }
+ tombstones, err := tombstone.List(syncAllContext(),
&tombstoneListReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tombstones))
+ err = tombstone.Delete(syncAllContext(), tombstones...)
+ assert.NoError(t, err)
+ })
+ })
+
t.Run("enableOnStart is true ,syncAllKey not exists and context is
context.Background() will do sync", func(t *testing.T) {
_ = archaius.Set("sync.enableOnStart", true)
+ config.SetConfig(config.Config{Sync: &config.Sync{
+ Peers: []*config.Peer{{Endpoints:
[]string{"127.0.0.1:30105", "127.0.0.2:30105"}}}}})
var accountName string
ctx :=
util.WithNoCache(util.SetDomainProject(context.Background(),
"sync-all-background", "sync-all-background"))
ctx = util.WithNoCache(util.SetContext(ctx, util.CtxEnableSync,
"1"))
diff --git a/datasource/manager.go b/datasource/manager.go
index 9ede3fa9..9f1e68a8 100644
--- a/datasource/manager.go
+++ b/datasource/manager.go
@@ -54,6 +54,7 @@ func Init(opts Options) error {
}
err = GetSyncManager().SyncAll(context.Background())
if err != nil && err != ErrSyncAllKeyExists {
+ log.Fatal("sync all failed", err)
return err
}
err = schema.Init(schema.Options{Kind: opts.Kind})