This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 738f155  [feat] add tag sync func and ut when db mode is etcd (#1204)
738f155 is described below

commit 738f155d006d5e47e4e100257e7b3a8600a3ce5f
Author: robotljw <[email protected]>
AuthorDate: Fri Jan 7 12:47:54 2022 +0800

    [feat] add tag sync func and ut when db mode is etcd (#1204)
---
 datasource/common.go              |   1 +
 datasource/etcd/account.go        |  65 +++++-------
 datasource/etcd/account_test.go   |   8 +-
 datasource/etcd/dep.go            |  38 +++----
 datasource/etcd/ms.go             | 194 ++++++++++++++++------------------
 datasource/etcd/role.go           |  59 +++++------
 datasource/etcd/role_test.go      |   8 +-
 datasource/etcd/sync/sync.go      | 126 ++++++++++++++++++++++
 datasource/etcd/sync/sync_test.go |  58 +++++++++++
 datasource/etcd/tag_test.go       | 214 ++++++++++++++++++++++++++++++++++++++
 datasource/etcd/task_util.go      |  41 --------
 datasource/etcd/tombstone_util.go |  38 -------
 datasource/etcd/util/tag_util.go  |  21 ++--
 13 files changed, 581 insertions(+), 290 deletions(-)

diff --git a/datasource/common.go b/datasource/common.go
index a3b9453..5149fb2 100644
--- a/datasource/common.go
+++ b/datasource/common.go
@@ -36,6 +36,7 @@ const (
        ResourceRole       = "role"
        ResourceDependency = "dependency"
        ResourceService    = "service"
+       ResourceKV         = "kv"
 )
 
 // WrapErrResponse is temp func here to wait finish to refact the discosvc pkg
diff --git a/datasource/etcd/account.go b/datasource/etcd/account.go
index d94352e..c0aded7 100644
--- a/datasource/etcd/account.go
+++ b/datasource/etcd/account.go
@@ -22,13 +22,13 @@ import (
        "strconv"
        "time"
 
-       rbacmodel "github.com/go-chassis/cari/rbac"
-       "github.com/go-chassis/cari/sync"
+       crbac "github.com/go-chassis/cari/rbac"
        "github.com/go-chassis/foundation/stringutil"
        "github.com/little-cui/etcdadpt"
 
        "github.com/apache/servicecomb-service-center/datasource"
        "github.com/apache/servicecomb-service-center/datasource/etcd/path"
+       esync 
"github.com/apache/servicecomb-service-center/datasource/etcd/sync"
        "github.com/apache/servicecomb-service-center/datasource/rbac"
        "github.com/apache/servicecomb-service-center/pkg/etcdsync"
        "github.com/apache/servicecomb-service-center/pkg/log"
@@ -49,7 +49,7 @@ func NewRbacDAO(opts rbac.Options) (rbac.DAO, error) {
 type RbacDAO struct {
 }
 
-func (ds *RbacDAO) CreateAccount(ctx context.Context, a *rbacmodel.Account) 
error {
+func (ds *RbacDAO) CreateAccount(ctx context.Context, a *crbac.Account) error {
        lock, err := etcdsync.Lock("/account-creating/"+a.Name, -1, false)
        if err != nil {
                return fmt.Errorf("account %s is creating", a.Name)
@@ -83,14 +83,12 @@ func (ds *RbacDAO) CreateAccount(ctx context.Context, a 
*rbacmodel.Account) erro
                log.Error("", err)
                return err
        }
-       if datasource.EnableSync {
-               op, err := GenTaskOpts("", "", sync.CreateAction, 
datasource.ResourceAccount, a)
-               if err != nil {
-                       log.Error("", err)
-                       return err
-               }
-               opts = append(opts, op)
+       syncOpts, err := esync.GenCreateOpts(ctx, datasource.ResourceAccount, a)
+       if err != nil {
+               log.Error("fail to create sync opts", err)
+               return err
        }
+       opts = append(opts, syncOpts...)
        err = etcdadpt.Txn(ctx, opts)
        if err != nil {
                log.Error("can not save account info", err)
@@ -100,7 +98,7 @@ func (ds *RbacDAO) CreateAccount(ctx context.Context, a 
*rbacmodel.Account) erro
        return nil
 }
 
-func GenAccountOpts(a *rbacmodel.Account, action etcdadpt.Action) 
([]etcdadpt.OpOptions, error) {
+func GenAccountOpts(a *crbac.Account, action etcdadpt.Action) 
([]etcdadpt.OpOptions, error) {
        opts := make([]etcdadpt.OpOptions, 0)
        value, err := json.Marshal(a)
        if err != nil {
@@ -126,7 +124,7 @@ func GenAccountOpts(a *rbacmodel.Account, action 
etcdadpt.Action) ([]etcdadpt.Op
 func (ds *RbacDAO) AccountExist(ctx context.Context, name string) (bool, 
error) {
        return etcdadpt.Exist(ctx, path.GenerateRBACAccountKey(name))
 }
-func (ds *RbacDAO) GetAccount(ctx context.Context, name string) 
(*rbacmodel.Account, error) {
+func (ds *RbacDAO) GetAccount(ctx context.Context, name string) 
(*crbac.Account, error) {
        kv, err := etcdadpt.Get(ctx, path.GenerateRBACAccountKey(name))
        if err != nil {
                return nil, err
@@ -134,7 +132,7 @@ func (ds *RbacDAO) GetAccount(ctx context.Context, name 
string) (*rbacmodel.Acco
        if kv == nil {
                return nil, rbac.ErrAccountNotExist
        }
-       account := &rbacmodel.Account{}
+       account := &crbac.Account{}
        err = json.Unmarshal(kv.Value, account)
        if err != nil {
                log.Error("account info format invalid", err)
@@ -144,7 +142,7 @@ func (ds *RbacDAO) GetAccount(ctx context.Context, name 
string) (*rbacmodel.Acco
        return account, nil
 }
 
-func (ds *RbacDAO) compatibleOldVersionAccount(a *rbacmodel.Account) {
+func (ds *RbacDAO) compatibleOldVersionAccount(a *crbac.Account) {
        // old version use Role, now use Roles
        // Role/Roles will not exist at the same time
        if len(a.Role) == 0 {
@@ -154,14 +152,14 @@ func (ds *RbacDAO) compatibleOldVersionAccount(a 
*rbacmodel.Account) {
        a.Role = ""
 }
 
-func (ds *RbacDAO) ListAccount(ctx context.Context) ([]*rbacmodel.Account, 
int64, error) {
+func (ds *RbacDAO) ListAccount(ctx context.Context) ([]*crbac.Account, int64, 
error) {
        kvs, n, err := etcdadpt.List(ctx, path.GenerateRBACAccountKey(""))
        if err != nil {
                return nil, 0, err
        }
-       accounts := make([]*rbacmodel.Account, 0, n)
+       accounts := make([]*crbac.Account, 0, n)
        for _, v := range kvs {
-               a := &rbacmodel.Account{}
+               a := &crbac.Account{}
                err = json.Unmarshal(v.Value, a)
                if err != nil {
                        log.Error("account info format invalid:", err)
@@ -194,20 +192,13 @@ func (ds *RbacDAO) DeleteAccount(ctx context.Context, 
names []string) (bool, err
                        continue //do not fail if some account is invalid
 
                }
-               if datasource.EnableSync {
-                       taskOpt, err := GenTaskOpts("", "", sync.DeleteAction, 
datasource.ResourceAccount, a)
-                       if err != nil {
-                               log.Error("", err)
-                               return false, err
-                       }
-                       tombstoneOpt, err := GenTombstoneOpts("", "", 
datasource.ResourceAccount, a.Name)
-                       if err != nil {
-                               log.Error("", err)
-                               return false, err
-                       }
-                       opts = append(opts, tombstoneOpt, taskOpt)
+               syncOpts, err := esync.GenDeleteOpts(ctx, 
datasource.ResourceAccount, a.Name, a)
+               if err != nil {
+                       log.Error("fail to create sync opts", err)
+                       return false, err
                }
                allOpts = append(allOpts, opts...)
+               allOpts = append(allOpts, syncOpts...)
        }
        err := etcdadpt.Txn(ctx, allOpts)
        if err != nil {
@@ -217,7 +208,7 @@ func (ds *RbacDAO) DeleteAccount(ctx context.Context, names 
[]string) (bool, err
        return true, nil
 }
 
-func (ds *RbacDAO) UpdateAccount(ctx context.Context, name string, account 
*rbacmodel.Account) error {
+func (ds *RbacDAO) UpdateAccount(ctx context.Context, name string, account 
*crbac.Account) error {
        var (
                opts []etcdadpt.OpOptions
                err  error
@@ -247,14 +238,12 @@ func (ds *RbacDAO) UpdateAccount(ctx context.Context, 
name string, account *rbac
                }
                opts = append(opts, opt)
        }
-       if datasource.EnableSync {
-               op, err := GenTaskOpts("", "", sync.UpdateAction, 
datasource.ResourceAccount, account)
-               if err != nil {
-                       log.Error("", err)
-                       return err
-               }
-               opts = append(opts, op)
+       syncOpts, err := esync.GenUpdateOpts(ctx, datasource.ResourceAccount, 
account)
+       if err != nil {
+               log.Error("fail to create sync opts", err)
+               return err
        }
+       opts = append(opts, syncOpts...)
        err = etcdadpt.Txn(ctx, opts)
        if err != nil {
                log.Error("BatchCommit failed", err)
@@ -262,7 +251,7 @@ func (ds *RbacDAO) UpdateAccount(ctx context.Context, name 
string, account *rbac
        return err
 }
 
-func hasRole(account *rbacmodel.Account, r string) bool {
+func hasRole(account *crbac.Account, r string) bool {
        for _, n := range account.Roles {
                if r == n {
                        return true
diff --git a/datasource/etcd/account_test.go b/datasource/etcd/account_test.go
index 47a3771..8284592 100644
--- a/datasource/etcd/account_test.go
+++ b/datasource/etcd/account_test.go
@@ -21,7 +21,7 @@ import (
        "context"
        "testing"
 
-       rbacmodel "github.com/go-chassis/cari/rbac"
+       crbac "github.com/go-chassis/cari/rbac"
        "github.com/stretchr/testify/assert"
 
        "github.com/apache/servicecomb-service-center/datasource"
@@ -39,7 +39,7 @@ func TestSyncAccount(t *testing.T) {
        t.Run("create account", func(t *testing.T) {
                t.Run("creating a account then delete it will create two tasks 
and a tombstone should pass",
                        func(t *testing.T) {
-                               a1 := rbacmodel.Account{
+                               a1 := crbac.Account{
                                        ID:                  
"sync-create-11111",
                                        Name:                
"sync-create-account1",
                                        Password:            "tnuocca-tset",
@@ -78,7 +78,7 @@ func TestSyncAccount(t *testing.T) {
        t.Run("update account", func(t *testing.T) {
                t.Run("creating two accounts then update them,finally delete 
them, will create six tasks and two tombstones should pass",
                        func(t *testing.T) {
-                               a2 := rbacmodel.Account{
+                               a2 := crbac.Account{
                                        ID:                  
"sync-update-22222",
                                        Name:                
"sync-update-account2",
                                        Password:            "tnuocca-tset",
@@ -86,7 +86,7 @@ func TestSyncAccount(t *testing.T) {
                                        TokenExpirationTime: "2020-12-30",
                                        CurrentPassword:     "tnuocca-tset",
                                }
-                               a3 := rbacmodel.Account{
+                               a3 := crbac.Account{
                                        ID:                  
"sync-update-33333",
                                        Name:                
"sync-update-account3",
                                        Password:            "tnuocca-tset",
diff --git a/datasource/etcd/dep.go b/datasource/etcd/dep.go
index 2a719a8..bf0eb74 100644
--- a/datasource/etcd/dep.go
+++ b/datasource/etcd/dep.go
@@ -24,14 +24,14 @@ import (
        "fmt"
 
        pb "github.com/go-chassis/cari/discovery"
-       "github.com/go-chassis/cari/sync"
        "github.com/little-cui/etcdadpt"
 
        "github.com/apache/servicecomb-service-center/datasource"
        "github.com/apache/servicecomb-service-center/datasource/etcd/event"
        "github.com/apache/servicecomb-service-center/datasource/etcd/path"
        "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
-       serviceUtil 
"github.com/apache/servicecomb-service-center/datasource/etcd/util"
+       esync 
"github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+       eutil 
"github.com/apache/servicecomb-service-center/datasource/etcd/util"
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/pkg/util"
 )
@@ -42,7 +42,7 @@ type DepManager struct {
 func (dm *DepManager) SearchProviderDependency(ctx context.Context, request 
*pb.GetDependenciesRequest) (*pb.GetProDependenciesResponse, error) {
        domainProject := util.ParseDomainProject(ctx)
        providerServiceID := request.ServiceId
-       provider, err := serviceUtil.GetService(ctx, domainProject, 
providerServiceID)
+       provider, err := eutil.GetService(ctx, domainProject, providerServiceID)
 
        if err != nil {
                if errors.Is(err, datasource.ErrNoData) {
@@ -55,7 +55,7 @@ func (dm *DepManager) SearchProviderDependency(ctx 
context.Context, request *pb.
                return nil, err
        }
 
-       services, err := serviceUtil.GetConsumers(ctx, domainProject, provider, 
toDependencyFilterOptions(request)...)
+       services, err := eutil.GetConsumers(ctx, domainProject, provider, 
toDependencyFilterOptions(request)...)
        if err != nil {
                log.Error(fmt.Sprintf("query provider failed, provider is 
%s/%s/%s/%s",
                        provider.Environment, provider.AppId, 
provider.ServiceName, provider.Version), err)
@@ -73,7 +73,7 @@ func (dm *DepManager) SearchProviderDependency(ctx 
context.Context, request *pb.
 func (dm *DepManager) SearchConsumerDependency(ctx context.Context, request 
*pb.GetDependenciesRequest) (*pb.GetConDependenciesResponse, error) {
        consumerID := request.ServiceId
        domainProject := util.ParseDomainProject(ctx)
-       consumer, err := serviceUtil.GetService(ctx, domainProject, consumerID)
+       consumer, err := eutil.GetService(ctx, domainProject, consumerID)
 
        if err != nil {
                if errors.Is(err, datasource.ErrNoData) {
@@ -86,7 +86,7 @@ func (dm *DepManager) SearchConsumerDependency(ctx 
context.Context, request *pb.
                return nil, err
        }
 
-       services, err := serviceUtil.GetProviders(ctx, domainProject, consumer, 
toDependencyFilterOptions(request)...)
+       services, err := eutil.GetProviders(ctx, domainProject, consumer, 
toDependencyFilterOptions(request)...)
        if err != nil {
                log.Error(fmt.Sprintf("query consumer failed, consumer is 
%s/%s/%s/%s",
                        consumer.Environment, consumer.AppId, 
consumer.ServiceName, consumer.Version), err)
@@ -136,7 +136,7 @@ func (dm *DepManager) AddOrUpdateDependencies(ctx 
context.Context, dependencyInf
                        return rsp.Response, nil
                }
 
-               consumerID, err := serviceUtil.GetServiceID(ctx, consumerInfo)
+               consumerID, err := eutil.GetServiceID(ctx, consumerInfo)
                if err != nil {
                        log.Error(fmt.Sprintf("put request into dependency 
queue failed, override: %t, get consumer[%s] id failed",
                                override, consumerFlag), err)
@@ -164,21 +164,17 @@ func (dm *DepManager) AddOrUpdateDependencies(ctx 
context.Context, dependencyInf
                opts = append(opts, etcdadpt.OpPut(etcdadpt.WithStrKey(key), 
etcdadpt.WithValue(data)))
        }
 
-       if datasource.EnableSync {
-               action := sync.UpdateAction
-               if override {
-                       action = sync.CreateAction
-               }
-               domain := util.ParseDomain(ctx)
-               project := util.ParseProject(ctx)
-               taskOpt, err := GenTaskOpts(domain, project, action, 
datasource.ResourceDependency, dependencyInfos)
-               if err != nil {
-                       log.Error("fail to create task", err)
-                       return pb.CreateResponse(pb.ErrInternal, err.Error()), 
err
-               }
-               opts = append(opts, taskOpt)
+       syncOpts, err := esync.GenCreateOpts(ctx, 
datasource.ResourceDependency, dependencyInfos)
+       if !override {
+               syncOpts, err = esync.GenUpdateOpts(ctx, 
datasource.ResourceDependency, dependencyInfos)
        }
-       err := etcdadpt.Txn(ctx, opts)
+       if err != nil {
+               log.Error("fail to create sync opts", err)
+               return pb.CreateResponse(pb.ErrInternal, err.Error()), err
+       }
+       opts = append(opts, syncOpts...)
+
+       err = etcdadpt.Txn(ctx, opts)
        if err != nil {
                log.Error(fmt.Sprintf("put request into dependency queue 
failed, override: %t, %v",
                        override, dependencyInfos), err)
diff --git a/datasource/etcd/ms.go b/datasource/etcd/ms.go
index 2747834..ce5ecf4 100644
--- a/datasource/etcd/ms.go
+++ b/datasource/etcd/ms.go
@@ -25,25 +25,25 @@ import (
        "strconv"
        "time"
 
-       "github.com/go-chassis/cari/sync"
+       pb "github.com/go-chassis/cari/discovery"
+       "github.com/go-chassis/cari/pkg/errsvc"
+       "github.com/go-chassis/foundation/gopool"
+       "github.com/jinzhu/copier"
+       "github.com/little-cui/etcdadpt"
 
        "github.com/apache/servicecomb-service-center/datasource"
        "github.com/apache/servicecomb-service-center/datasource/etcd/cache"
        "github.com/apache/servicecomb-service-center/datasource/etcd/path"
        "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
        
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
-       serviceUtil 
"github.com/apache/servicecomb-service-center/datasource/etcd/util"
+       esync 
"github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+       eutil 
"github.com/apache/servicecomb-service-center/datasource/etcd/util"
        "github.com/apache/servicecomb-service-center/datasource/schema"
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/pkg/util"
        "github.com/apache/servicecomb-service-center/server/core"
        "github.com/apache/servicecomb-service-center/server/plugin/uuid"
        quotasvc 
"github.com/apache/servicecomb-service-center/server/service/quota"
-       pb "github.com/go-chassis/cari/discovery"
-       "github.com/go-chassis/cari/pkg/errsvc"
-       "github.com/go-chassis/foundation/gopool"
-       "github.com/jinzhu/copier"
-       "github.com/little-cui/etcdadpt"
 )
 
 var (
@@ -127,18 +127,14 @@ func (ds *MetadataManager) RegisterService(ctx 
context.Context, request *pb.Crea
                failOpts = append(failOpts, 
etcdadpt.OpGet(etcdadpt.WithStrKey(alias)))
        }
 
-       if datasource.EnableSync {
-               domain := util.ParseDomain(ctx)
-               project := util.ParseProject(ctx)
-               taskOpt, err := GenTaskOpts(domain, project, sync.CreateAction, 
datasource.ResourceService, request)
-               if err != nil {
-                       log.Error("fail to create task", err)
-                       return &pb.CreateServiceResponse{
-                               Response: pb.CreateResponse(pb.ErrInternal, 
err.Error()),
-                       }, err
-               }
-               opts = append(opts, taskOpt)
+       syncOpts, err := esync.GenCreateOpts(ctx, datasource.ResourceService, 
request)
+       if err != nil {
+               log.Error("fail to create sync opts", err)
+               return &pb.CreateServiceResponse{
+                       Response: pb.CreateResponse(pb.ErrInternal, 
err.Error()),
+               }, err
        }
+       opts = append(opts, syncOpts...)
 
        resp, err := etcdadpt.TxnWithCmp(ctx, opts, uniqueCmpOpts, failOpts)
        if err != nil {
@@ -192,7 +188,7 @@ func (ds *MetadataManager) RegisterService(ctx 
context.Context, request *pb.Crea
 
 func (ds *MetadataManager) GetServices(ctx context.Context, request 
*pb.GetServicesRequest) (
        *pb.GetServicesResponse, error) {
-       services, err := serviceUtil.GetAllServiceUtil(ctx)
+       services, err := eutil.GetAllServiceUtil(ctx)
        if err != nil {
                log.Error("get all services by domain failed", err)
                return &pb.GetServicesResponse{
@@ -209,7 +205,7 @@ func (ds *MetadataManager) GetServices(ctx context.Context, 
request *pb.GetServi
 func (ds *MetadataManager) GetService(ctx context.Context, request 
*pb.GetServiceRequest) (
        *pb.MicroService, error) {
        domainProject := util.ParseDomainProject(ctx)
-       singleService, err := serviceUtil.GetService(ctx, domainProject, 
request.ServiceId)
+       singleService, err := eutil.GetService(ctx, domainProject, 
request.ServiceId)
 
        if err != nil {
                if errors.Is(err, datasource.ErrNoData) {
@@ -226,7 +222,7 @@ func (ds *MetadataManager) GetServiceDetail(ctx 
context.Context, request *pb.Get
        *pb.ServiceDetail, error) {
        domainProject := util.ParseDomainProject(ctx)
 
-       service, err := serviceUtil.GetService(ctx, domainProject, 
request.ServiceId)
+       service, err := eutil.GetService(ctx, domainProject, request.ServiceId)
        if err != nil {
                if errors.Is(err, datasource.ErrNoData) {
                        return nil, pb.NewError(pb.ErrServiceNotExists, 
"Service does not exist.")
@@ -297,7 +293,7 @@ func (ds *MetadataManager) ListServiceDetail(ctx 
context.Context, request *pb.Ge
        }
 
        //获取所有服务
-       services, err := serviceUtil.GetAllServiceUtil(ctx)
+       services, err := eutil.GetAllServiceUtil(ctx)
        if err != nil {
                log.Error("get all services by domain failed", err)
                return nil, pb.NewError(pb.ErrInternal, err.Error())
@@ -370,7 +366,7 @@ func (ds *MetadataManager) ListApp(ctx context.Context, 
request *pb.GetAppsReque
        domainProject := util.ParseDomainProject(ctx)
        key := path.GetServiceAppKey(domainProject, request.Environment, "")
 
-       opts := append(serviceUtil.FromContext(ctx),
+       opts := append(eutil.FromContext(ctx),
                etcdadpt.WithStrKey(key),
                etcdadpt.WithPrefix(),
                etcdadpt.WithKeyOnly())
@@ -410,7 +406,7 @@ func (ds *MetadataManager) ExistServiceByID(ctx 
context.Context, request *pb.Get
        domainProject := util.ParseDomainProject(ctx)
        return &pb.GetExistenceByIDResponse{
                Response: pb.CreateResponse(pb.ResponseSuccess, "Get all 
applications successfully."),
-               Exist:    serviceUtil.ServiceExist(ctx, domainProject, 
request.ServiceId),
+               Exist:    eutil.ServiceExist(ctx, domainProject, 
request.ServiceId),
        }, nil
 }
 
@@ -420,7 +416,7 @@ func (ds *MetadataManager) ExistService(ctx 
context.Context, request *pb.GetExis
        serviceFlag := util.StringJoin([]string{
                request.Environment, request.AppId, request.ServiceName, 
request.Version}, path.SPLIT)
 
-       ids, exist, err := serviceUtil.FindServiceIds(ctx, &pb.MicroServiceKey{
+       ids, exist, err := eutil.FindServiceIds(ctx, &pb.MicroServiceKey{
                Environment: request.Environment,
                AppId:       request.AppId,
                ServiceName: request.ServiceName,
@@ -458,7 +454,7 @@ func (ds *MetadataManager) UpdateService(ctx 
context.Context, request *pb.Update
        domainProject := util.ParseDomainProject(ctx)
 
        key := path.GenerateServiceKey(domainProject, request.ServiceId)
-       microservice, err := serviceUtil.GetService(ctx, domainProject, 
request.ServiceId)
+       microservice, err := eutil.GetService(ctx, domainProject, 
request.ServiceId)
        if err != nil {
                if errors.Is(err, datasource.ErrNoData) {
                        log.Debug(fmt.Sprintf("service does not exist, update 
service[%s] properties failed, operator: %s",
@@ -490,18 +486,14 @@ func (ds *MetadataManager) UpdateService(ctx 
context.Context, request *pb.Update
        opts := []etcdadpt.OpOptions{
                etcdadpt.OpPut(etcdadpt.WithStrKey(key), 
etcdadpt.WithValue(data)),
        }
-       if datasource.EnableSync {
-               domain := util.ParseDomain(ctx)
-               project := util.ParseProject(ctx)
-               taskOpt, err := GenTaskOpts(domain, project, sync.UpdateAction, 
datasource.ResourceService, request)
-               if err != nil {
-                       log.Error("fail to create task", err)
-                       return &pb.UpdateServicePropsResponse{
-                               Response: pb.CreateResponse(pb.ErrInternal, 
err.Error()),
-                       }, err
-               }
-               opts = append(opts, taskOpt)
+       syncOpts, err := esync.GenUpdateOpts(ctx, datasource.ResourceService, 
request)
+       if err != nil {
+               log.Error("fail to create task", err)
+               return &pb.UpdateServicePropsResponse{
+                       Response: pb.CreateResponse(pb.ErrInternal, 
err.Error()),
+               }, err
        }
+       opts = append(opts, syncOpts...)
 
        // Set key file
        resp, err := etcdadpt.TxnWithCmp(ctx, opts, 
etcdadpt.If(etcdadpt.NotEqualVer(key, 0)), nil)
@@ -660,7 +652,7 @@ func (ds *MetadataManager) sendHeartbeatInstead(ctx 
context.Context, instance *p
 
 func (ds *MetadataManager) ExistInstanceByID(ctx context.Context, request 
*pb.MicroServiceInstanceKey) (*pb.GetExistenceByIDResponse, error) {
        domainProject := util.ParseDomainProject(ctx)
-       exist, _ := serviceUtil.InstanceExist(ctx, domainProject, 
request.ServiceId, request.InstanceId)
+       exist, _ := eutil.InstanceExist(ctx, domainProject, request.ServiceId, 
request.InstanceId)
        if !exist {
                return &pb.GetExistenceByIDResponse{
                        Response: pb.CreateResponse(pb.ErrInstanceNotExists, 
"Check instance exist failed."),
@@ -680,7 +672,7 @@ func (ds *MetadataManager) GetInstance(ctx context.Context, 
request *pb.GetOneIn
        service := &pb.MicroService{}
        var err error
        if len(request.ConsumerServiceId) > 0 {
-               service, err = serviceUtil.GetService(ctx, domainProject, 
request.ConsumerServiceId)
+               service, err = eutil.GetService(ctx, domainProject, 
request.ConsumerServiceId)
                if err != nil {
                        if errors.Is(err, datasource.ErrNoData) {
                                log.Debug(fmt.Sprintf("consumer does not exist 
in db, consumer[%s] find provider instance[%s/%s]",
@@ -698,7 +690,7 @@ func (ds *MetadataManager) GetInstance(ctx context.Context, 
request *pb.GetOneIn
                }
        }
 
-       provider, err := serviceUtil.GetService(ctx, domainProject, 
request.ProviderServiceId)
+       provider, err := eutil.GetService(ctx, domainProject, 
request.ProviderServiceId)
        if err != nil {
                if errors.Is(err, datasource.ErrNoData) {
                        log.Debug(fmt.Sprintf("provider does not exist in db, 
consumer[%s] find provider instance[%s/%s]",
@@ -761,7 +753,7 @@ func (ds *MetadataManager) GetInstances(ctx 
context.Context, request *pb.GetInst
        service := &pb.MicroService{}
        var err error
        if len(request.ConsumerServiceId) > 0 {
-               service, err = serviceUtil.GetService(ctx, domainProject, 
request.ConsumerServiceId)
+               service, err = eutil.GetService(ctx, domainProject, 
request.ConsumerServiceId)
                if err != nil {
                        if errors.Is(err, datasource.ErrNoData) {
                                log.Debug(fmt.Sprintf("consumer does not exist 
in db, consumer[%s] find provider[%s] instances",
@@ -779,7 +771,7 @@ func (ds *MetadataManager) GetInstances(ctx 
context.Context, request *pb.GetInst
                }
        }
 
-       provider, err := serviceUtil.GetService(ctx, domainProject, 
request.ProviderServiceId)
+       provider, err := eutil.GetService(ctx, domainProject, 
request.ProviderServiceId)
        if err != nil {
                if errors.Is(err, datasource.ErrNoData) {
                        log.Debug(fmt.Sprintf("provider does not exist, 
consumer[%s] find provider[%s] instances",
@@ -844,7 +836,7 @@ func (ds *MetadataManager) GetProviderInstances(ctx 
context.Context, request *pb
        if err != nil {
                return
        }
-       return instances, serviceUtil.FormatRevision(maxRevs, counts), nil
+       return instances, eutil.FormatRevision(maxRevs, counts), nil
 }
 
 func (ds *MetadataManager) BatchGetProviderInstances(ctx context.Context, 
request *pb.BatchGetInstancesRequest) (instances []*pb.MicroServiceInstance, 
rev string, err error) {
@@ -865,12 +857,12 @@ func (ds *MetadataManager) BatchGetProviderInstances(ctx 
context.Context, reques
                instances = append(instances, insts...)
        }
 
-       return instances, serviceUtil.FormatRevision(maxRevs, counts), nil
+       return instances, eutil.FormatRevision(maxRevs, counts), nil
 }
 
 func (ds *MetadataManager) findInstances(ctx context.Context, domainProject, 
serviceID string, maxRevs []int64, counts []int64) (instances 
[]*pb.MicroServiceInstance, err error) {
        key := path.GenerateInstanceKey(domainProject, serviceID, "")
-       opts := append(serviceUtil.FromContext(ctx), etcdadpt.WithStrKey(key), 
etcdadpt.WithPrefix())
+       opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key), 
etcdadpt.WithPrefix())
        resp, err := sd.Instance().Search(ctx, opts...)
        if err != nil {
                return nil, err
@@ -922,7 +914,7 @@ func (ds *MetadataManager) findInstance(ctx 
context.Context, request *pb.FindIns
        domainProject := util.ParseDomainProject(ctx)
        service := &pb.MicroService{Environment: request.Environment}
        if len(request.ConsumerServiceId) > 0 {
-               service, err = serviceUtil.GetService(ctx, domainProject, 
request.ConsumerServiceId)
+               service, err = eutil.GetService(ctx, domainProject, 
request.ConsumerServiceId)
                if err != nil {
                        if errors.Is(err, datasource.ErrNoData) {
                                log.Debug(fmt.Sprintf("consumer does not exist, 
consumer[%s] find provider[%s/%s/%s]",
@@ -976,7 +968,7 @@ func (ds *MetadataManager) findInstance(ctx 
context.Context, request *pb.FindIns
                        return nil, err
                }
                if provider != nil {
-                       err = serviceUtil.AddServiceVersionRule(ctx, 
domainProject, service, provider)
+                       err = eutil.AddServiceVersionRule(ctx, domainProject, 
service, provider)
                } else {
                        err := fmt.Errorf("%s failed, provider does not exist", 
findFlag)
                        log.Error("AddServiceVersionRule failed", err)
@@ -1040,7 +1032,7 @@ func (ds *MetadataManager) genFindResult(ctx 
context.Context, oldRev string, ite
 func (ds *MetadataManager) reshapeProviderKey(ctx context.Context, provider 
*pb.MicroServiceKey, providerID string) (
        *pb.MicroServiceKey, error) {
        // service name 可能是别名,所以重新获取
-       providerService, err := serviceUtil.GetService(ctx, provider.Tenant, 
providerID)
+       providerService, err := eutil.GetService(ctx, provider.Tenant, 
providerID)
        if err != nil {
                return nil, err
        }
@@ -1054,7 +1046,7 @@ func (ds *MetadataManager) UpdateInstanceStatus(ctx 
context.Context, request *pb
        domainProject := util.ParseDomainProject(ctx)
        updateStatusFlag := util.StringJoin([]string{request.ServiceId, 
request.InstanceId, request.Status}, path.SPLIT)
 
-       instance, err := serviceUtil.GetInstance(ctx, domainProject, 
request.ServiceId, request.InstanceId)
+       instance, err := eutil.GetInstance(ctx, domainProject, 
request.ServiceId, request.InstanceId)
        if err != nil {
                log.Error(fmt.Sprintf("update instance[%s] status failed", 
updateStatusFlag), err)
                return &pb.UpdateInstanceStatusResponse{
@@ -1071,7 +1063,7 @@ func (ds *MetadataManager) UpdateInstanceStatus(ctx 
context.Context, request *pb
        copyInstanceRef := *instance
        copyInstanceRef.Status = request.Status
 
-       if err := serviceUtil.UpdateInstance(ctx, domainProject, 
&copyInstanceRef); err != nil {
+       if err := eutil.UpdateInstance(ctx, domainProject, &copyInstanceRef); 
err != nil {
                log.Error(fmt.Sprintf("update instance[%s] status failed", 
updateStatusFlag), err)
                resp := &pb.UpdateInstanceStatusResponse{
                        Response: pb.CreateResponseWithSCErr(err),
@@ -1093,7 +1085,7 @@ func (ds *MetadataManager) UpdateInstanceProperties(ctx 
context.Context, request
        domainProject := util.ParseDomainProject(ctx)
        instanceFlag := util.StringJoin([]string{request.ServiceId, 
request.InstanceId}, path.SPLIT)
 
-       instance, err := serviceUtil.GetInstance(ctx, domainProject, 
request.ServiceId, request.InstanceId)
+       instance, err := eutil.GetInstance(ctx, domainProject, 
request.ServiceId, request.InstanceId)
        if err != nil {
                log.Error(fmt.Sprintf("update instance[%s] properties failed", 
instanceFlag), err)
                return &pb.UpdateInstancePropsResponse{
@@ -1110,7 +1102,7 @@ func (ds *MetadataManager) UpdateInstanceProperties(ctx 
context.Context, request
        copyInstanceRef := *instance
        copyInstanceRef.Properties = request.Properties
 
-       if err := serviceUtil.UpdateInstance(ctx, domainProject, 
&copyInstanceRef); err != nil {
+       if err := eutil.UpdateInstance(ctx, domainProject, &copyInstanceRef); 
err != nil {
                log.Error(fmt.Sprintf("update instance[%s] properties failed", 
instanceFlag), err)
                resp := &pb.UpdateInstancePropsResponse{
                        Response: pb.CreateResponseWithSCErr(err),
@@ -1222,7 +1214,7 @@ func (ds *MetadataManager) batchFindServices(ctx 
context.Context, request *pb.Ba
                        return nil, err
                }
                failed, ok := failedResult[resp.Response.GetCode()]
-               serviceUtil.AppendFindResponse(findCtx, int64(index), 
resp.Response, resp.Instances,
+               eutil.AppendFindResponse(findCtx, int64(index), resp.Response, 
resp.Instances,
                        &services.Updated, &services.NotModified, &failed)
                if !ok && failed != nil {
                        failedResult[resp.Response.GetCode()] = failed
@@ -1255,7 +1247,7 @@ func (ds *MetadataManager) batchFindInstances(ctx 
context.Context, request *pb.B
                        return nil, err
                }
                failed, ok := failedResult[resp.Response.GetCode()]
-               serviceUtil.AppendFindResponse(getCtx, int64(index), 
resp.Response, []*pb.MicroServiceInstance{resp.Instance},
+               eutil.AppendFindResponse(getCtx, int64(index), resp.Response, 
[]*pb.MicroServiceInstance{resp.Instance},
                        &instances.Updated, &instances.NotModified, &failed)
                if !ok && failed != nil {
                        failedResult[resp.Response.GetCode()] = failed
@@ -1300,7 +1292,7 @@ func (ds *MetadataManager) Heartbeat(ctx context.Context, 
request *pb.HeartbeatR
        domainProject := util.ParseDomainProject(ctx)
        instanceFlag := util.StringJoin([]string{request.ServiceId, 
request.InstanceId}, path.SPLIT)
 
-       _, ttl, err := serviceUtil.HeartbeatUtil(ctx, domainProject, 
request.ServiceId, request.InstanceId)
+       _, ttl, err := eutil.HeartbeatUtil(ctx, domainProject, 
request.ServiceId, request.InstanceId)
        if err != nil {
                log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator 
%s",
                        instanceFlag, remoteIP), err)
@@ -1329,7 +1321,7 @@ func (ds *MetadataManager) Heartbeat(ctx context.Context, 
request *pb.HeartbeatR
 func (ds *MetadataManager) GetAllInstances(ctx context.Context, request 
*pb.GetAllInstancesRequest) (*pb.GetAllInstancesResponse, error) {
        domainProject := util.ParseDomainProject(ctx)
        key := path.GetInstanceRootKey(domainProject) + path.SPLIT
-       opts := append(serviceUtil.FromContext(ctx), etcdadpt.WithStrKey(key), 
etcdadpt.WithPrefix())
+       opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key), 
etcdadpt.WithPrefix())
        kvs, err := sd.Instance().Search(ctx, opts...)
        if err != nil {
                return nil, err
@@ -1354,7 +1346,7 @@ func (ds *MetadataManager) ModifySchemas(ctx 
context.Context, request *pb.Modify
        serviceID := request.ServiceId
        domainProject := util.ParseDomainProject(ctx)
 
-       serviceInfo, err := serviceUtil.GetService(ctx, domainProject, 
serviceID)
+       serviceInfo, err := eutil.GetService(ctx, domainProject, serviceID)
        if err != nil {
                if errors.Is(err, datasource.ErrNoData) {
                        log.Debug(fmt.Sprintf("modify service[%s] schemas 
failed, service does not exist in db, operator: %s",
@@ -1403,7 +1395,7 @@ func (ds *MetadataManager) ExistSchema(ctx 
context.Context, request *pb.GetExist
        *pb.GetExistenceResponse, error) {
        domainProject := util.ParseDomainProject(ctx)
 
-       if !serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId) {
+       if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
                log.Warn(fmt.Sprintf("schema[%s/%s] exist failed, service does 
not exist", request.ServiceId, request.SchemaId))
                return nil, pb.NewError(pb.ErrServiceNotExists, "service does 
not exist.")
        }
@@ -1434,14 +1426,14 @@ func (ds *MetadataManager) ExistSchema(ctx 
context.Context, request *pb.GetExist
 func (ds *MetadataManager) GetSchema(ctx context.Context, request 
*pb.GetSchemaRequest) (*pb.GetSchemaResponse, error) {
        domainProject := util.ParseDomainProject(ctx)
 
-       if !serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId) {
+       if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
                log.Error(fmt.Sprintf("get schema[%s/%s] failed, service does 
not exist",
                        request.ServiceId, request.SchemaId), nil)
                return nil, pb.NewError(pb.ErrServiceNotExists, "Service does 
not exist.")
        }
 
        key := path.GenerateServiceSchemaKey(domainProject, request.ServiceId, 
request.SchemaId)
-       opts := append(serviceUtil.FromContext(ctx), etcdadpt.WithStrKey(key))
+       opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key))
        resp, errDo := sd.Schema().Search(ctx, opts...)
        if errDo != nil {
                log.Error(fmt.Sprintf("get schema[%s/%s] failed", 
request.ServiceId, request.SchemaId), errDo)
@@ -1471,7 +1463,7 @@ func (ds *MetadataManager) GetAllSchemas(ctx 
context.Context, request *pb.GetAll
        *pb.GetAllSchemaResponse, error) {
        domainProject := util.ParseDomainProject(ctx)
 
-       service, err := serviceUtil.GetService(ctx, domainProject, 
request.ServiceId)
+       service, err := eutil.GetService(ctx, domainProject, request.ServiceId)
        if err != nil {
                if errors.Is(err, datasource.ErrNoData) {
                        log.Debug(fmt.Sprintf("get service[%s] all schemas 
failed, service does not exist in db", request.ServiceId))
@@ -1490,7 +1482,7 @@ func (ds *MetadataManager) GetAllSchemas(ctx 
context.Context, request *pb.GetAll
        }
 
        key := path.GenerateServiceSchemaSummaryKey(domainProject, 
request.ServiceId, "")
-       opts := append(serviceUtil.FromContext(ctx), etcdadpt.WithStrKey(key), 
etcdadpt.WithPrefix())
+       opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key), 
etcdadpt.WithPrefix())
        resp, errDo := sd.SchemaSummary().Search(ctx, opts...)
        if errDo != nil {
                log.Error(fmt.Sprintf("get service[%s] all schema summaries 
failed", request.ServiceId), errDo)
@@ -1500,7 +1492,7 @@ func (ds *MetadataManager) GetAllSchemas(ctx 
context.Context, request *pb.GetAll
        respWithSchema := &kvstore.Response{}
        if request.WithSchema {
                key := path.GenerateServiceSchemaKey(domainProject, 
request.ServiceId, "")
-               opts := append(serviceUtil.FromContext(ctx), 
etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
+               opts := append(eutil.FromContext(ctx), 
etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
                respWithSchema, errDo = sd.Schema().Search(ctx, opts...)
                if errDo != nil {
                        log.Error(fmt.Sprintf("get service[%s] all schemas 
failed", request.ServiceId), errDo)
@@ -1540,7 +1532,7 @@ func (ds *MetadataManager) DeleteSchema(ctx 
context.Context, request *pb.DeleteS
        domainProject := util.ParseDomainProject(ctx)
 
        key := path.GenerateServiceSchemaKey(domainProject, request.ServiceId, 
request.SchemaId)
-       exist, err := serviceUtil.CheckSchemaInfoExist(ctx, key)
+       exist, err := eutil.CheckSchemaInfoExist(ctx, key)
        if err != nil {
                log.Error(fmt.Sprintf("delete schema[%s/%s] failed, operator: 
%s",
                        request.ServiceId, request.SchemaId, remoteIP), err)
@@ -1582,7 +1574,7 @@ func (ds *MetadataManager) AddTags(ctx context.Context, 
request *pb.AddServiceTa
        domainProject := util.ParseDomainProject(ctx)
 
        // service id存在性校验
-       if !serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId) {
+       if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
                log.Error(fmt.Sprintf("add service[%s]'s tags %v failed, 
service does not exist, operator: %s",
                        request.ServiceId, request.Tags, remoteIP), nil)
                return &pb.AddServiceTagsResponse{
@@ -1590,7 +1582,7 @@ func (ds *MetadataManager) AddTags(ctx context.Context, 
request *pb.AddServiceTa
                }, nil
        }
 
-       checkErr := serviceUtil.AddTagIntoETCD(ctx, domainProject, 
request.ServiceId, request.Tags)
+       checkErr := eutil.AddTagIntoETCD(ctx, domainProject, request.ServiceId, 
request.Tags)
        if checkErr != nil {
                log.Error(fmt.Sprintf("add service[%s]'s tags %v failed, 
operator: %s",
                        request.ServiceId, request.Tags, remoteIP), checkErr)
@@ -1612,13 +1604,13 @@ func (ds *MetadataManager) AddTags(ctx context.Context, 
request *pb.AddServiceTa
 func (ds *MetadataManager) GetTags(ctx context.Context, request 
*pb.GetServiceTagsRequest) (*pb.GetServiceTagsResponse, error) {
        var err error
        domainProject := util.ParseDomainProject(ctx)
-       if !serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId) {
+       if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
                log.Error(fmt.Sprintf("get service[%s]'s tags failed, service 
does not exist", request.ServiceId), err)
                return &pb.GetServiceTagsResponse{
                        Response: pb.CreateResponse(pb.ErrServiceNotExists, 
"Service does not exist."),
                }, nil
        }
-       tags, err := serviceUtil.GetTagsUtils(ctx, domainProject, 
request.ServiceId)
+       tags, err := eutil.GetTagsUtils(ctx, domainProject, request.ServiceId)
        if err != nil {
                log.Error(fmt.Sprintf("get service[%s]'s tags failed, get tags 
failed", request.ServiceId), err)
                return &pb.GetServiceTagsResponse{
@@ -1638,7 +1630,7 @@ func (ds *MetadataManager) UpdateTag(ctx context.Context, 
request *pb.UpdateServ
        tagFlag := util.StringJoin([]string{request.Key, request.Value}, 
path.SPLIT)
        domainProject := util.ParseDomainProject(ctx)
 
-       if !serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId) {
+       if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
                log.Error(fmt.Sprintf("update service[%s]'s tag[%s] failed, 
service does not exist, operator: %s",
                        request.ServiceId, tagFlag, remoteIP), err)
                return &pb.UpdateServiceTagResponse{
@@ -1646,7 +1638,7 @@ func (ds *MetadataManager) UpdateTag(ctx context.Context, 
request *pb.UpdateServ
                }, nil
        }
 
-       tags, err := serviceUtil.GetTagsUtils(ctx, domainProject, 
request.ServiceId)
+       tags, err := eutil.GetTagsUtils(ctx, domainProject, request.ServiceId)
        if err != nil {
                log.Error(fmt.Sprintf("update service[%s]'s tag[%s] failed, get 
tag failed, operator: %s",
                        request.ServiceId, tagFlag, remoteIP), err)
@@ -1670,7 +1662,7 @@ func (ds *MetadataManager) UpdateTag(ctx context.Context, 
request *pb.UpdateServ
        }
        copyTags[request.Key] = request.Value
 
-       checkErr := serviceUtil.AddTagIntoETCD(ctx, domainProject, 
request.ServiceId, copyTags)
+       checkErr := eutil.AddTagIntoETCD(ctx, domainProject, request.ServiceId, 
copyTags)
        if checkErr != nil {
                log.Error(fmt.Sprintf("update service[%s]'s tag[%s] failed, 
operator: %s",
                        request.ServiceId, tagFlag, remoteIP), checkErr)
@@ -1693,7 +1685,7 @@ func (ds *MetadataManager) DeleteTags(ctx 
context.Context, request *pb.DeleteSer
        remoteIP := util.GetIPFromContext(ctx)
        domainProject := util.ParseDomainProject(ctx)
 
-       if !serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId) {
+       if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
                log.Error(fmt.Sprintf("delete service[%s]'s tags %v failed, 
service does not exist, operator: %s",
                        request.ServiceId, request.Keys, remoteIP), nil)
                return &pb.DeleteServiceTagsResponse{
@@ -1701,7 +1693,7 @@ func (ds *MetadataManager) DeleteTags(ctx 
context.Context, request *pb.DeleteSer
                }, nil
        }
 
-       tags, err := serviceUtil.GetTagsUtils(ctx, domainProject, 
request.ServiceId)
+       tags, err := eutil.GetTagsUtils(ctx, domainProject, request.ServiceId)
        if err != nil {
                log.Error(fmt.Sprintf("delete service[%s]'s tags %v failed, get 
service tags failed, operator: %s",
                        request.ServiceId, request.Keys, remoteIP), err)
@@ -1737,10 +1729,18 @@ func (ds *MetadataManager) DeleteTags(ctx 
context.Context, request *pb.DeleteSer
 
        key := path.GenerateServiceTagKey(domainProject, request.ServiceId)
 
-       resp, err := etcdadpt.TxnWithCmp(ctx,
-               etcdadpt.Ops(etcdadpt.OpPut(etcdadpt.WithStrKey(key), 
etcdadpt.WithValue(data))),
-               
etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, 
request.ServiceId), 0)),
-               nil)
+       opts := etcdadpt.Ops(etcdadpt.OpPut(etcdadpt.WithStrKey(key), 
etcdadpt.WithValue(data)))
+       syncOpts, err := esync.GenDeleteOpts(ctx, datasource.ResourceKV, key, 
data,
+               esync.WithOpts(map[string]string{"key": key}))
+       if err != nil {
+               return &pb.DeleteServiceTagsResponse{
+                       Response: pb.CreateResponse(pb.ErrInternal, 
err.Error()),
+               }, err
+       }
+       opts = append(opts, syncOpts...)
+
+       resp, err := etcdadpt.TxnWithCmp(ctx, opts,
+               
etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, 
request.ServiceId), 0)), nil)
        if err != nil {
                log.Error(fmt.Sprintf("delete service[%s]'s tags %v failed, 
operator: %s",
                        request.ServiceId, request.Keys, remoteIP), err)
@@ -1786,7 +1786,7 @@ func (ds *MetadataManager) modifySchemas(ctx 
context.Context, domainProject stri
                        }
 
                        service.Schemas = nonExistSchemaIds
-                       opt, err := serviceUtil.UpdateService(domainProject, 
serviceID, service)
+                       opt, err := eutil.UpdateService(domainProject, 
serviceID, service)
                        if err != nil {
                                log.Error(fmt.Sprintf("modify service[%s] 
schemas failed, update service.Schemas failed, operator: %s",
                                        serviceID, remoteIP), err)
@@ -1851,7 +1851,7 @@ func (ds *MetadataManager) modifySchemas(ctx 
context.Context, domainProject stri
                }
 
                service.Schemas = schemaIDs
-               opt, err := serviceUtil.UpdateService(domainProject, serviceID, 
service)
+               opt, err := eutil.UpdateService(domainProject, serviceID, 
service)
                if err != nil {
                        log.Error(fmt.Sprintf("modify service[%s] schemas 
failed, update service.Schemas failed, operator: %s",
                                serviceID, remoteIP), err)
@@ -1883,7 +1883,7 @@ func (ds *MetadataManager) modifySchema(ctx 
context.Context, serviceID string, s
        domainProject := util.ParseDomainProject(ctx)
        schemaID := schema.SchemaId
 
-       microService, err := serviceUtil.GetService(ctx, domainProject, 
serviceID)
+       microService, err := eutil.GetService(ctx, domainProject, serviceID)
        if err != nil {
                if errors.Is(err, datasource.ErrNoData) {
                        log.Debug(fmt.Sprintf("microService does not exist, 
modify schema[%s/%s] failed, operator: %s",
@@ -1933,7 +1933,7 @@ func (ds *MetadataManager) modifySchema(ctx 
context.Context, serviceID string, s
 
                if len(microService.Schemas) == 0 {
                        microService.Schemas = append(microService.Schemas, 
schemaID)
-                       opt, err := serviceUtil.UpdateService(domainProject, 
serviceID, microService)
+                       opt, err := eutil.UpdateService(domainProject, 
serviceID, microService)
                        if err != nil {
                                log.Error(fmt.Sprintf("modify schema[%s/%s] 
failed, update microService.Schemas failed, operator: %s",
                                        serviceID, schemaID, remoteIP), err)
@@ -1944,7 +1944,7 @@ func (ds *MetadataManager) modifySchema(ctx 
context.Context, serviceID string, s
        } else {
                if !isExist {
                        microService.Schemas = append(microService.Schemas, 
schemaID)
-                       opt, err := serviceUtil.UpdateService(domainProject, 
serviceID, microService)
+                       opt, err := eutil.UpdateService(domainProject, 
serviceID, microService)
                        if err != nil {
                                log.Error(fmt.Sprintf("modify schema[%s/%s] 
failed, update microService.Schemas failed, operator: %s",
                                        serviceID, schemaID, remoteIP), err)
@@ -1984,7 +1984,7 @@ func (ds *MetadataManager) DeleteServicePri(ctx 
context.Context, serviceID strin
                return pb.CreateResponse(pb.ErrInvalidParams, err.Error()), nil
        }
 
-       microservice, err := serviceUtil.GetService(ctx, domainProject, 
serviceID)
+       microservice, err := eutil.GetService(ctx, domainProject, serviceID)
        if err != nil {
                if errors.Is(err, datasource.ErrNoData) {
                        log.Debug(fmt.Sprintf("service does not exist, %s 
micro-service[%s] failed, operator: %s",
@@ -1998,7 +1998,7 @@ func (ds *MetadataManager) DeleteServicePri(ctx 
context.Context, serviceID strin
 
        // 强制删除,则与该服务相关的信息删除,非强制删除: 
如果作为该被依赖(作为provider,提供服务,且不是只存在自依赖)或者存在实例,则不能删除
        if !force {
-               services, err := serviceUtil.GetConsumerIds(ctx, domainProject, 
microservice)
+               services, err := eutil.GetConsumerIds(ctx, domainProject, 
microservice)
                if err != nil {
                        log.Error(fmt.Sprintf("delete micro-service[%s] failed, 
get service dependency failed, operator: %s",
                                serviceID, remoteIP), err)
@@ -2042,26 +2042,16 @@ func (ds *MetadataManager) DeleteServicePri(ctx 
context.Context, serviceID strin
                
etcdadpt.OpDel(etcdadpt.WithStrKey(path.GenerateServiceAliasKey(serviceKey))),
                etcdadpt.OpDel(etcdadpt.WithStrKey(serviceIDKey)),
        }
-
-       if datasource.EnableSync {
-               domain := util.ParseDomain(ctx)
-               project := util.ParseProject(ctx)
-               taskOpt, err := GenTaskOpts(domain, project, sync.DeleteAction, 
datasource.ResourceService,
-                       &pb.DeleteServiceRequest{ServiceId: serviceID, Force: 
force})
-               if err != nil {
-                       log.Error("fail to create task", err)
-                       return pb.CreateResponse(pb.ErrInternal, err.Error()), 
err
-               }
-               tombstoneOpt, err := GenTombstoneOpts(domain, project, 
datasource.ResourceService, serviceID)
-               if err != nil {
-                       log.Error("fail to create tombstone", err)
-                       return pb.CreateResponse(pb.ErrInternal, err.Error()), 
err
-               }
-               opts = append(opts, taskOpt, tombstoneOpt)
+       syncOpts, err := esync.GenDeleteOpts(ctx, datasource.ResourceService, 
serviceID,
+               &pb.DeleteServiceRequest{ServiceId: serviceID, Force: force})
+       if err != nil {
+               log.Error("fail to sync opt", err)
+               return pb.CreateResponse(pb.ErrInternal, err.Error()), err
        }
+       opts = append(opts, syncOpts...)
 
        //删除依赖规则
-       optDeleteDep, err := 
serviceUtil.DeleteDependencyForDeleteService(domainProject, serviceID, 
serviceKey)
+       optDeleteDep, err := 
eutil.DeleteDependencyForDeleteService(domainProject, serviceID, serviceKey)
        if err != nil {
                log.Error(fmt.Sprintf("%s micro-service[%s] failed, delete 
dependency failed, operator: %s",
                        title, serviceID, remoteIP), err)
@@ -2093,7 +2083,7 @@ func (ds *MetadataManager) DeleteServicePri(ctx 
context.Context, serviceID strin
                etcdadpt.WithPrefix()))
 
        //删除实例
-       err = serviceUtil.DeleteServiceAllInstances(ctx, serviceID)
+       err = eutil.DeleteServiceAllInstances(ctx, serviceID)
        if err != nil {
                log.Error(fmt.Sprintf("%s micro-service[%s] failed, revoke all 
instances failed, operator: %s",
                        title, serviceID, remoteIP), err)
diff --git a/datasource/etcd/role.go b/datasource/etcd/role.go
index 9e117d9..d2a3735 100644
--- a/datasource/etcd/role.go
+++ b/datasource/etcd/role.go
@@ -24,19 +24,19 @@ import (
        "strconv"
        "time"
 
-       rbacmodel "github.com/go-chassis/cari/rbac"
-       "github.com/go-chassis/cari/sync"
+       crbac "github.com/go-chassis/cari/rbac"
        "github.com/little-cui/etcdadpt"
 
        "github.com/apache/servicecomb-service-center/datasource"
        "github.com/apache/servicecomb-service-center/datasource/etcd/path"
+       esync 
"github.com/apache/servicecomb-service-center/datasource/etcd/sync"
        "github.com/apache/servicecomb-service-center/datasource/rbac"
        "github.com/apache/servicecomb-service-center/pkg/etcdsync"
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/pkg/util"
 )
 
-func (rm *RbacDAO) CreateRole(ctx context.Context, r *rbacmodel.Role) error {
+func (rm *RbacDAO) CreateRole(ctx context.Context, r *crbac.Role) error {
        lock, err := etcdsync.Lock("/role-creating/"+r.Name, -1, false)
        if err != nil {
                return fmt.Errorf("role %s is creating", r.Name)
@@ -67,14 +67,12 @@ func (rm *RbacDAO) CreateRole(ctx context.Context, r 
*rbacmodel.Role) error {
        opts := []etcdadpt.OpOptions{
                etcdadpt.OpPut(etcdadpt.WithStrKey(key), 
etcdadpt.WithValue(value)),
        }
-       if datasource.EnableSync {
-               taskOpt, err := GenTaskOpts("", "", sync.CreateAction, 
datasource.ResourceRole, r)
-               if err != nil {
-                       log.Error("", err)
-                       return err
-               }
-               opts = append(opts, taskOpt)
+       syncOpts, err := esync.GenCreateOpts(ctx, datasource.ResourceRole, r)
+       if err != nil {
+               log.Error("fail to create sync opts", err)
+               return err
        }
+       opts = append(opts, syncOpts...)
        err = etcdadpt.Txn(ctx, opts)
        if err != nil {
                log.Error("can not save account info", err)
@@ -88,7 +86,7 @@ func (rm *RbacDAO) RoleExist(ctx context.Context, name 
string) (bool, error) {
        return etcdadpt.Exist(ctx, path.GenerateRBACRoleKey(name))
 }
 
-func (rm *RbacDAO) GetRole(ctx context.Context, name string) (*rbacmodel.Role, 
error) {
+func (rm *RbacDAO) GetRole(ctx context.Context, name string) (*crbac.Role, 
error) {
        kv, err := etcdadpt.Get(ctx, path.GenerateRBACRoleKey(name))
        if err != nil {
                return nil, err
@@ -96,7 +94,7 @@ func (rm *RbacDAO) GetRole(ctx context.Context, name string) 
(*rbacmodel.Role, e
        if kv == nil {
                return nil, rbac.ErrRoleNotExist
        }
-       role := &rbacmodel.Role{}
+       role := &crbac.Role{}
        err = json.Unmarshal(kv.Value, role)
        if err != nil {
                log.Error("role info format invalid", err)
@@ -104,14 +102,14 @@ func (rm *RbacDAO) GetRole(ctx context.Context, name 
string) (*rbacmodel.Role, e
        }
        return role, nil
 }
-func (rm *RbacDAO) ListRole(ctx context.Context) ([]*rbacmodel.Role, int64, 
error) {
+func (rm *RbacDAO) ListRole(ctx context.Context) ([]*crbac.Role, int64, error) 
{
        kvs, n, err := etcdadpt.List(ctx, path.GenerateRBACRoleKey(""))
        if err != nil {
                return nil, 0, err
        }
-       roles := make([]*rbacmodel.Role, 0, n)
+       roles := make([]*crbac.Role, 0, n)
        for _, v := range kvs {
-               r := &rbacmodel.Role{}
+               r := &crbac.Role{}
                err = json.Unmarshal(v.Value, r)
                if err != nil {
                        log.Error("role info format invalid:", err)
@@ -132,19 +130,12 @@ func (rm *RbacDAO) DeleteRole(ctx context.Context, name 
string) (bool, error) {
                return false, rbac.ErrRoleBindingExist
        }
        opts := 
[]etcdadpt.OpOptions{etcdadpt.OpDel(etcdadpt.WithStrKey(path.GenerateRBACRoleKey(name)))}
-       if datasource.EnableSync {
-               taskOpt, err := GenTaskOpts("", "", sync.DeleteAction, 
datasource.ResourceRole, name)
-               if err != nil {
-                       log.Error("", err)
-                       return false, err
-               }
-               tombstoneOpt, err := GenTombstoneOpts("", "", 
datasource.ResourceRole, name)
-               if err != nil {
-                       log.Error("", err)
-                       return false, err
-               }
-               opts = append(opts, taskOpt, tombstoneOpt)
+       syncOpts, err := esync.GenDeleteOpts(ctx, datasource.ResourceRole, 
name, name)
+       if err != nil {
+               log.Error("fail to create sync opts", err)
+               return false, err
        }
+       opts = append(opts, syncOpts...)
        err = etcdadpt.Txn(ctx, opts)
        if err != nil {
                return false, err
@@ -159,7 +150,7 @@ func RoleBindingExists(ctx context.Context, role string) 
(bool, error) {
        }
        return total > 0, nil
 }
-func (rm *RbacDAO) UpdateRole(ctx context.Context, name string, role 
*rbacmodel.Role) error {
+func (rm *RbacDAO) UpdateRole(ctx context.Context, name string, role 
*crbac.Role) error {
        role.UpdateTime = strconv.FormatInt(time.Now().Unix(), 10)
        value, err := json.Marshal(role)
        if err != nil {
@@ -169,13 +160,11 @@ func (rm *RbacDAO) UpdateRole(ctx context.Context, name 
string, role *rbacmodel.
        opts := []etcdadpt.OpOptions{
                
etcdadpt.OpPut(etcdadpt.WithStrKey(path.GenerateRBACRoleKey(name)), 
etcdadpt.WithValue(value)),
        }
-       if datasource.EnableSync {
-               taskOpt, err := GenTaskOpts("", "", sync.UpdateAction, 
datasource.ResourceRole, role)
-               if err != nil {
-                       log.Error("", err)
-                       return err
-               }
-               opts = append(opts, taskOpt)
+       syncOpts, err := esync.GenUpdateOpts(ctx, datasource.ResourceRole, role)
+       if err != nil {
+               log.Error("fail to create sync opts", err)
+               return err
        }
+       opts = append(opts, syncOpts...)
        return etcdadpt.Txn(ctx, opts)
 }
diff --git a/datasource/etcd/role_test.go b/datasource/etcd/role_test.go
index 0a447c5..a9af11f 100644
--- a/datasource/etcd/role_test.go
+++ b/datasource/etcd/role_test.go
@@ -22,7 +22,7 @@ import (
        "strconv"
        "testing"
 
-       rbacmodel "github.com/go-chassis/cari/rbac"
+       crbac "github.com/go-chassis/cari/rbac"
        "github.com/stretchr/testify/assert"
 
        "github.com/apache/servicecomb-service-center/datasource"
@@ -39,7 +39,7 @@ func TestSyncRole(t *testing.T) {
 
        t.Run("create role", func(t *testing.T) {
                t.Run("creating a role and delete it will create two tasks and 
a tombstone should pass", func(t *testing.T) {
-                       r1 := rbacmodel.Role{
+                       r1 := crbac.Role{
                                ID:    "create-11111",
                                Name:  "create-role",
                                Perms: nil,
@@ -78,12 +78,12 @@ func TestSyncRole(t *testing.T) {
        t.Run("update role", func(t *testing.T) {
                t.Run("create two roles ,then update them, finally delete them, 
will create six tasks and two tombstones should pass",
                        func(t *testing.T) {
-                               r2 := rbacmodel.Role{
+                               r2 := crbac.Role{
                                        ID:    "update-22222",
                                        Name:  "update-role-22222",
                                        Perms: nil,
                                }
-                               r3 := rbacmodel.Role{
+                               r3 := crbac.Role{
                                        ID:    "update-33333",
                                        Name:  "update-role-33333",
                                        Perms: nil,
diff --git a/datasource/etcd/sync/sync.go b/datasource/etcd/sync/sync.go
new file mode 100644
index 0000000..28f4112
--- /dev/null
+++ b/datasource/etcd/sync/sync.go
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sync
+
+import (
+       "context"
+       "encoding/json"
+
+       "github.com/go-chassis/cari/sync"
+       "github.com/little-cui/etcdadpt"
+
+       "github.com/apache/servicecomb-service-center/datasource"
+       
"github.com/apache/servicecomb-service-center/eventbase/datasource/etcd/key"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+)
+
+type Options struct {
+       ResourceID string
+       Opts       map[string]string
+}
+
+type Option func(options *Options)
+
+func NewSyncOptions() Options {
+       return Options{}
+}
+
+func WithResourceID(resourceID string) Option {
+       return func(options *Options) {
+               options.ResourceID = resourceID
+       }
+}
+
+func WithOpts(opts map[string]string) Option {
+       return func(options *Options) {
+               options.Opts = opts
+       }
+}
+
+func GenCreateOpts(ctx context.Context, resourceType string, resource 
interface{},
+       options ...Option) ([]etcdadpt.OpOptions, error) {
+       return genOpts(ctx, sync.CreateAction, resourceType, resource, 
options...)
+}
+
+func GenUpdateOpts(ctx context.Context, resourceType string, resource 
interface{},
+       options ...Option) ([]etcdadpt.OpOptions, error) {
+       return genOpts(ctx, sync.UpdateAction, resourceType, resource, 
options...)
+}
+
+func GenDeleteOpts(ctx context.Context, resourceType, resourceID string, 
resource interface{},
+       options ...Option) ([]etcdadpt.OpOptions, error) {
+       options = append(options, WithResourceID(resourceID))
+       return genOpts(ctx, sync.DeleteAction, resourceType, resource, 
options...)
+
+}
+
+func genOpts(ctx context.Context, action string, resourceType string, resource 
interface{},
+       options ...Option) ([]etcdadpt.OpOptions, error) {
+       if !datasource.EnableSync {
+               return nil, nil
+       }
+       syncOpts := NewSyncOptions()
+       for _, option := range options {
+               option(&syncOpts)
+       }
+       taskOpt, err := genTaskOpt(ctx, action, resourceType, resource, 
&syncOpts)
+       if err != nil {
+               return nil, err
+       }
+       if action != sync.DeleteAction {
+               return []etcdadpt.OpOptions{taskOpt}, nil
+       }
+       tombstoneOpt, err := genTombstoneOpt(ctx, resourceType, 
syncOpts.ResourceID)
+       if err != nil {
+               return nil, err
+       }
+       return []etcdadpt.OpOptions{taskOpt, tombstoneOpt}, nil
+}
+
+func genTaskOpt(ctx context.Context, action string, resourceType string, 
resource interface{},
+       syncOpts *Options) (etcdadpt.OpOptions, error) {
+       domain := util.ParseDomain(ctx)
+       project := util.ParseProject(ctx)
+       task, err := sync.NewTask(domain, project, action, resourceType, 
resource)
+       if err != nil {
+               return etcdadpt.OpOptions{}, err
+       }
+       if syncOpts.Opts != nil {
+               task.Opts = syncOpts.Opts
+       }
+       taskBytes, err := json.Marshal(task)
+       if err != nil {
+               return etcdadpt.OpOptions{}, err
+       }
+       taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(domain, 
project,
+               task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes))
+       return taskOpPut, nil
+}
+
+func genTombstoneOpt(ctx context.Context, resourceType, resourceID string) 
(etcdadpt.OpOptions, error) {
+       domain := util.ParseDomain(ctx)
+       project := util.ParseProject(ctx)
+       tombstone := sync.NewTombstone(domain, project, resourceType, 
resourceID)
+       tombstoneBytes, err := json.Marshal(tombstone)
+       if err != nil {
+               return etcdadpt.OpOptions{}, err
+       }
+       tombstoneOpPut := 
etcdadpt.OpPut(etcdadpt.WithStrKey(key.TombstoneKey(domain, project, 
tombstone.ResourceType,
+               tombstone.ResourceID)), etcdadpt.WithValue(tombstoneBytes))
+       return tombstoneOpPut, nil
+}
diff --git a/datasource/etcd/sync/sync_test.go 
b/datasource/etcd/sync/sync_test.go
new file mode 100644
index 0000000..c723ba4
--- /dev/null
+++ b/datasource/etcd/sync/sync_test.go
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sync_test
+
+import (
+       "context"
+       "testing"
+
+       pb "github.com/go-chassis/cari/discovery"
+       "github.com/stretchr/testify/assert"
+
+       "github.com/apache/servicecomb-service-center/datasource"
+       "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+)
+
+func optsContext() context.Context {
+       return util.WithNoCache(util.SetDomainProject(context.Background(), 
"sync-opts",
+               "sync-opts"))
+}
+
+func TestOpts(t *testing.T) {
+       datasource.EnableSync = true
+
+       t.Run("create func will create a task opt should pass", func(t 
*testing.T) {
+               opts, err := sync.GenCreateOpts(optsContext(), 
datasource.ResourceService, &pb.CreateServiceRequest{})
+               assert.Nil(t, err)
+               assert.Equal(t, 1, len(opts))
+       })
+
+       t.Run("update func will create a task opt should pass", func(t 
*testing.T) {
+               opts, err := sync.GenUpdateOpts(optsContext(), 
datasource.ResourceService, &pb.UpdateServicePropsRequest{})
+               assert.Nil(t, err)
+               assert.Equal(t, 1, len(opts))
+       })
+
+       t.Run("delete func will create a task and a tombstone should pass", 
func(t *testing.T) {
+               opts, err := sync.GenDeleteOpts(optsContext(), 
datasource.ResourceService, "11111", &pb.DeleteServiceRequest{})
+               assert.Nil(t, err)
+               assert.Equal(t, 2, len(opts))
+       })
+       datasource.EnableSync = false
+}
diff --git a/datasource/etcd/tag_test.go b/datasource/etcd/tag_test.go
new file mode 100644
index 0000000..829fbe1
--- /dev/null
+++ b/datasource/etcd/tag_test.go
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package etcd_test
+
+import (
+       "context"
+       "testing"
+
+       pb "github.com/go-chassis/cari/discovery"
+       "github.com/go-chassis/cari/sync"
+       "github.com/stretchr/testify/assert"
+
+       "github.com/apache/servicecomb-service-center/datasource"
+       "github.com/apache/servicecomb-service-center/eventbase/model"
+       "github.com/apache/servicecomb-service-center/eventbase/service/task"
+       
"github.com/apache/servicecomb-service-center/eventbase/service/tombstone"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       _ "github.com/apache/servicecomb-service-center/test"
+)
+
+func tagContext() context.Context {
+       return util.WithNoCache(util.SetDomainProject(context.Background(), 
"sync-tag",
+               "sync-tag"))
+}
+
+func TestSyncTag(t *testing.T) {
+       var serviceID string
+       datasource.EnableSync = true
+       t.Run("create service", func(t *testing.T) {
+               t.Run("register a micro service will create a task should 
pass", func(t *testing.T) {
+                       resp, err := 
datasource.GetMetadataManager().RegisterService(tagContext(), 
&pb.CreateServiceRequest{
+                               Service: &pb.MicroService{
+                                       AppId:       "sync_tag_group",
+                                       ServiceName: "sync_micro_service_tag",
+                                       Version:     "1.0.0",
+                                       Level:       "FRONT",
+                                       Status:      pb.MS_UP,
+                               },
+                       })
+                       assert.NotNil(t, resp)
+                       assert.NoError(t, err)
+                       assert.Equal(t, pb.ResponseSuccess, 
resp.Response.GetCode())
+                       serviceID = resp.ServiceId
+                       listTaskReq := model.ListTaskRequest{
+                               Domain:       "sync-tag",
+                               Project:      "sync-tag",
+                               ResourceType: datasource.ResourceService,
+                               Action:       sync.CreateAction,
+                               Status:       sync.PendingStatus,
+                       }
+                       tasks, err := task.List(tagContext(), &listTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 1, len(tasks))
+                       err = task.Delete(tagContext(), tasks...)
+                       assert.NoError(t, err)
+                       tasks, err = task.List(tagContext(), &listTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 0, len(tasks))
+               })
+       })
+
+       t.Run("add tags", func(t *testing.T) {
+               t.Run("add tags for a service will create a task should pass", 
func(t *testing.T) {
+                       respAddTages, err := 
datasource.GetMetadataManager().AddTags(tagContext(), &pb.AddServiceTagsRequest{
+                               ServiceId: serviceID,
+                               Tags: map[string]string{
+                                       "a": "test",
+                                       "b": "b",
+                               },
+                       })
+                       assert.NoError(t, err)
+                       assert.Equal(t, pb.ResponseSuccess, 
respAddTages.Response.GetCode())
+                       listTaskReq := model.ListTaskRequest{
+                               Domain:       "sync-tag",
+                               Project:      "sync-tag",
+                               ResourceType: datasource.ResourceKV,
+                               Action:       sync.UpdateAction,
+                               Status:       sync.PendingStatus,
+                       }
+                       tasks, err := task.List(tagContext(), &listTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 1, len(tasks))
+                       err = task.Delete(tagContext(), tasks...)
+                       assert.NoError(t, err)
+                       tasks, err = task.List(tagContext(), &listTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 0, len(tasks))
+               })
+       })
+
+       t.Run("update a tag", func(t *testing.T) {
+               t.Run("update a service tag will create a task should pass", 
func(t *testing.T) {
+                       resp, err := 
datasource.GetMetadataManager().UpdateTag(tagContext(), 
&pb.UpdateServiceTagRequest{
+                               ServiceId: serviceID,
+                               Key:       "a",
+                               Value:     "update",
+                       })
+                       assert.NoError(t, err)
+                       assert.Equal(t, pb.ResponseSuccess, 
resp.Response.GetCode())
+                       listTaskReq := model.ListTaskRequest{
+                               Domain:       "sync-tag",
+                               Project:      "sync-tag",
+                               ResourceType: datasource.ResourceKV,
+                               Action:       sync.UpdateAction,
+                               Status:       sync.PendingStatus,
+                       }
+                       tasks, err := task.List(tagContext(), &listTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 1, len(tasks))
+                       err = task.Delete(tagContext(), tasks...)
+                       assert.NoError(t, err)
+                       tasks, err = task.List(tagContext(), &listTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 0, len(tasks))
+               })
+       })
+
+       t.Run("delete tags", func(t *testing.T) {
+               t.Run("delete a service's tags will create a task and a 
tombstone should pass", func(t *testing.T) {
+                       resp, err := 
datasource.GetMetadataManager().DeleteTags(tagContext(), 
&pb.DeleteServiceTagsRequest{
+                               ServiceId: serviceID,
+                               Keys:      []string{"a", "b"},
+                       })
+                       assert.NoError(t, err)
+                       assert.Equal(t, pb.ResponseSuccess, 
resp.Response.GetCode())
+                       respGetTags, err := 
datasource.GetMetadataManager().GetTags(tagContext(), &pb.GetServiceTagsRequest{
+                               ServiceId: serviceID,
+                       })
+                       assert.NoError(t, err)
+                       assert.Equal(t, pb.ResponseSuccess, 
resp.Response.GetCode())
+                       assert.Equal(t, "", respGetTags.Tags["a"])
+                       assert.Equal(t, "", respGetTags.Tags["b"])
+                       listTaskReq := model.ListTaskRequest{
+                               Domain:       "sync-tag",
+                               Project:      "sync-tag",
+                               ResourceType: datasource.ResourceKV,
+                               Action:       sync.DeleteAction,
+                               Status:       sync.PendingStatus,
+                       }
+                       tasks, err := task.List(tagContext(), &listTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 1, len(tasks))
+                       err = task.Delete(tagContext(), tasks...)
+                       assert.NoError(t, err)
+                       tasks, err = task.List(tagContext(), &listTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 0, len(tasks))
+                       tombstoneListReq := model.ListTombstoneRequest{
+                               Domain:       "sync-tag",
+                               Project:      "sync-tag",
+                               ResourceType: datasource.ResourceKV,
+                       }
+                       tombstones, err := tombstone.List(tagContext(), 
&tombstoneListReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 1, len(tombstones))
+                       err = tombstone.Delete(tagContext(), tombstones...)
+                       assert.NoError(t, err)
+               })
+       })
+
+       t.Run("unregister service", func(t *testing.T) {
+               t.Run("unregister a service will create a task and a tombstone 
should pass", func(t *testing.T) {
+                       resp, err := 
datasource.GetMetadataManager().UnregisterService(tagContext(), 
&pb.DeleteServiceRequest{
+                               ServiceId: serviceID,
+                               Force:     true,
+                       })
+                       assert.NotNil(t, resp)
+                       assert.NoError(t, err)
+                       assert.Equal(t, pb.ResponseSuccess, 
resp.Response.GetCode())
+                       listTaskReq := model.ListTaskRequest{
+                               Domain:       "sync-tag",
+                               Project:      "sync-tag",
+                               ResourceType: datasource.ResourceService,
+                               Action:       sync.DeleteAction,
+                               Status:       sync.PendingStatus,
+                       }
+                       tasks, err := task.List(tagContext(), &listTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 1, len(tasks))
+                       err = task.Delete(tagContext(), tasks...)
+                       assert.NoError(t, err)
+                       tasks, err = task.List(tagContext(), &listTaskReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 0, len(tasks))
+                       tombstoneListReq := model.ListTombstoneRequest{
+                               Domain:       "sync-tag",
+                               Project:      "sync-tag",
+                               ResourceType: datasource.ResourceService,
+                       }
+                       tombstones, err := tombstone.List(tagContext(), 
&tombstoneListReq)
+                       assert.NoError(t, err)
+                       assert.Equal(t, 1, len(tombstones))
+                       err = tombstone.Delete(tagContext(), tombstones...)
+                       assert.NoError(t, err)
+               })
+       })
+
+       datasource.EnableSync = false
+}
diff --git a/datasource/etcd/task_util.go b/datasource/etcd/task_util.go
deleted file mode 100644
index 232a841..0000000
--- a/datasource/etcd/task_util.go
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package etcd
-
-import (
-       "encoding/json"
-
-       "github.com/go-chassis/cari/sync"
-       "github.com/little-cui/etcdadpt"
-
-       
"github.com/apache/servicecomb-service-center/eventbase/datasource/etcd/key"
-)
-
-func GenTaskOpts(domain, project, action, resourceType string, resource 
interface{}) (etcdadpt.OpOptions, error) {
-       task, err := sync.NewTask(domain, project, action, resourceType, 
resource)
-       if err != nil {
-               return etcdadpt.OpOptions{}, err
-       }
-       taskBytes, err := json.Marshal(task)
-       if err != nil {
-               return etcdadpt.OpOptions{}, err
-       }
-       taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(domain, 
project,
-               task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes))
-       return taskOpPut, nil
-}
diff --git a/datasource/etcd/tombstone_util.go 
b/datasource/etcd/tombstone_util.go
deleted file mode 100644
index 6a968c0..0000000
--- a/datasource/etcd/tombstone_util.go
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package etcd
-
-import (
-       "encoding/json"
-
-       "github.com/go-chassis/cari/sync"
-       "github.com/little-cui/etcdadpt"
-
-       
"github.com/apache/servicecomb-service-center/eventbase/datasource/etcd/key"
-)
-
-func GenTombstoneOpts(domain, project, resourceType, resourceID string) 
(etcdadpt.OpOptions, error) {
-       tombstone := sync.NewTombstone(domain, project, resourceType, 
resourceID)
-       tombstoneBytes, err := json.Marshal(tombstone)
-       if err != nil {
-               return etcdadpt.OpOptions{}, err
-       }
-       tombstoneOpPut := 
etcdadpt.OpPut(etcdadpt.WithStrKey(key.TombstoneKey(domain, project, 
tombstone.ResourceType,
-               tombstone.ResourceID)), etcdadpt.WithValue(tombstoneBytes))
-       return tombstoneOpPut, nil
-}
diff --git a/datasource/etcd/util/tag_util.go b/datasource/etcd/util/tag_util.go
index a65fcc7..1b33f80 100644
--- a/datasource/etcd/util/tag_util.go
+++ b/datasource/etcd/util/tag_util.go
@@ -22,12 +22,15 @@ import (
        "encoding/json"
        "fmt"
 
-       "github.com/apache/servicecomb-service-center/datasource/etcd/path"
-       "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
-       "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/go-chassis/cari/discovery"
        "github.com/go-chassis/cari/pkg/errsvc"
        "github.com/little-cui/etcdadpt"
+
+       "github.com/apache/servicecomb-service-center/datasource"
+       "github.com/apache/servicecomb-service-center/datasource/etcd/path"
+       "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
+       esync 
"github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+       "github.com/apache/servicecomb-service-center/pkg/log"
 )
 
 func AddTagIntoETCD(ctx context.Context, domainProject string, serviceID 
string, dataTags map[string]string) *errsvc.Error {
@@ -37,10 +40,14 @@ func AddTagIntoETCD(ctx context.Context, domainProject 
string, serviceID string,
                return discovery.NewError(discovery.ErrInternal, err.Error())
        }
 
-       resp, err := etcdadpt.TxnWithCmp(ctx,
-               etcdadpt.Ops(etcdadpt.OpPut(etcdadpt.WithStrKey(key), 
etcdadpt.WithValue(data))),
-               
etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, 
serviceID), 0)),
-               nil)
+       opts := etcdadpt.Ops(etcdadpt.OpPut(etcdadpt.WithStrKey(key), 
etcdadpt.WithValue(data)))
+       syncOpts, err := esync.GenUpdateOpts(ctx, datasource.ResourceKV, data, 
esync.WithOpts(map[string]string{"key": key}))
+       if err != nil {
+               return discovery.NewError(discovery.ErrInternal, err.Error())
+       }
+       opts = append(opts, syncOpts...)
+       resp, err := etcdadpt.TxnWithCmp(ctx, opts,
+               
etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, 
serviceID), 0)), nil)
        if err != nil {
                return discovery.NewError(discovery.ErrUnavailableBackend, 
err.Error())
        }

Reply via email to