This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 63e45f6 [bugfix] missing sync task when sync is enabled (#1262)
63e45f6 is described below
commit 63e45f6e367dfb222789f28faa87e411fe0177bb
Author: robotljw <[email protected]>
AuthorDate: Tue Feb 15 16:53:07 2022 +0800
[bugfix] missing sync task when sync is enabled (#1262)
---
datasource/etcd/ms.go | 34 ++++++++------
datasource/etcd/schema.go | 20 +++++++--
datasource/etcd/schema_test.go | 3 +-
datasource/etcd/sync_test.go | 3 +-
datasource/etcd/util.go | 74 +++++++++++++++++++++++++------
datasource/etcd/util/dependency_util.go | 19 ++++++--
datasource/etcd/util/microservice_util.go | 23 +++++++---
server/service/rbac/rbac.go | 11 +++--
8 files changed, 139 insertions(+), 48 deletions(-)
diff --git a/datasource/etcd/ms.go b/datasource/etcd/ms.go
index d20c6ee..917f270 100644
--- a/datasource/etcd/ms.go
+++ b/datasource/etcd/ms.go
@@ -1428,13 +1428,13 @@ func (ds *MetadataManager) modifySchemas(ctx
context.Context, domainProject stri
}
service.Schemas = nonExistSchemaIds
- opt, err := eutil.UpdateService(domainProject,
serviceID, service)
+ opts, err := eutil.UpdateService(ctx, domainProject,
serviceID, service)
if err != nil {
log.Error(fmt.Sprintf("modify service[%s]
schemas failed, update service.Schemas failed, operator: %s",
serviceID, remoteIP), err)
return pb.NewError(pb.ErrInternal, err.Error())
}
- pluginOps = append(pluginOps, opt)
+ pluginOps = append(pluginOps, opts...)
} else {
if len(nonExistSchemaIds) != 0 {
errInfo := fmt.Errorf("non-existent schemaIDs
%v", nonExistSchemaIds)
@@ -1447,7 +1447,10 @@ func (ds *MetadataManager) modifySchemas(ctx
context.Context, domainProject stri
return pb.NewError(pb.ErrInternal,
err.Error())
}
if !exist {
- opts :=
schemaWithDatabaseOpera(etcdadpt.OpPut, domainProject, serviceID,
needUpdateSchema)
+ opts, err := putSchema(ctx,
domainProject, serviceID, needUpdateSchema)
+ if err != nil {
+ return
pb.NewError(pb.ErrInternal, err.Error())
+ }
pluginOps = append(pluginOps, opts...)
} else {
log.Warn(fmt.Sprintf("schema[%s/%s] and
it's summary already exist, skip to update, operator: %s",
@@ -1458,7 +1461,7 @@ func (ds *MetadataManager) modifySchemas(ctx
context.Context, domainProject stri
for _, schema := range needAddSchemas {
log.Info(fmt.Sprintf("add new schema[%s/%s], operator:
%s", serviceID, schema.SchemaId, remoteIP))
- opts := schemaWithDatabaseOpera(etcdadpt.OpPut,
domainProject, service.ServiceId, schema)
+ opts, _ := putSchema(ctx, domainProject,
service.ServiceId, schema)
pluginOps = append(pluginOps, opts...)
}
} else {
@@ -1474,32 +1477,32 @@ func (ds *MetadataManager) modifySchemas(ctx
context.Context, domainProject stri
var schemaIDs []string
for _, schema := range needAddSchemas {
log.Info(fmt.Sprintf("add new schema[%s/%s], operator:
%s", serviceID, schema.SchemaId, remoteIP))
- opts := schemaWithDatabaseOpera(etcdadpt.OpPut,
domainProject, service.ServiceId, schema)
+ opts, _ := putSchema(ctx, domainProject,
service.ServiceId, schema)
pluginOps = append(pluginOps, opts...)
schemaIDs = append(schemaIDs, schema.SchemaId)
}
for _, schema := range needUpdateSchemas {
log.Info(fmt.Sprintf("update schema[%s/%s], operator:
%s", serviceID, schema.SchemaId, remoteIP))
- opts := schemaWithDatabaseOpera(etcdadpt.OpPut,
domainProject, serviceID, schema)
+ opts, _ := putSchema(ctx, domainProject, serviceID,
schema)
pluginOps = append(pluginOps, opts...)
schemaIDs = append(schemaIDs, schema.SchemaId)
}
for _, schema := range needDeleteSchemas {
log.Info(fmt.Sprintf("delete non-existent
schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
- opts := schemaWithDatabaseOpera(etcdadpt.OpDel,
domainProject, serviceID, schema)
+ opts, _ := deleteSchema(ctx, domainProject, serviceID,
schema)
pluginOps = append(pluginOps, opts...)
}
service.Schemas = schemaIDs
- opt, err := eutil.UpdateService(domainProject, serviceID,
service)
+ opts, err := eutil.UpdateService(ctx, domainProject, serviceID,
service)
if err != nil {
log.Error(fmt.Sprintf("modify service[%s] schemas
failed, update service.Schemas failed, operator: %s",
serviceID, remoteIP), err)
return pb.NewError(pb.ErrInternal, err.Error())
}
- pluginOps = append(pluginOps, opt)
+ pluginOps = append(pluginOps, opts...)
}
if len(pluginOps) != 0 {
@@ -1575,28 +1578,31 @@ func (ds *MetadataManager) modifySchema(ctx
context.Context, serviceID string, s
if len(microService.Schemas) == 0 {
microService.Schemas = append(microService.Schemas,
schemaID)
- opt, err := eutil.UpdateService(domainProject,
serviceID, microService)
+ opts, err := eutil.UpdateService(ctx, domainProject,
serviceID, microService)
if err != nil {
log.Error(fmt.Sprintf("modify schema[%s/%s]
failed, update microService.Schemas failed, operator: %s",
serviceID, schemaID, remoteIP), err)
return pb.NewError(pb.ErrInternal, err.Error())
}
- pluginOps = append(pluginOps, opt)
+ pluginOps = append(pluginOps, opts...)
}
} else {
if !isExist {
microService.Schemas = append(microService.Schemas,
schemaID)
- opt, err := eutil.UpdateService(domainProject,
serviceID, microService)
+ opts, err := eutil.UpdateService(ctx, domainProject,
serviceID, microService)
if err != nil {
log.Error(fmt.Sprintf("modify schema[%s/%s]
failed, update microService.Schemas failed, operator: %s",
serviceID, schemaID, remoteIP), err)
return pb.NewError(pb.ErrInternal, err.Error())
}
- pluginOps = append(pluginOps, opt)
+ pluginOps = append(pluginOps, opts...)
}
}
- opts := commitSchemaInfo(domainProject, serviceID, schema)
+ opts, err := commitSchemaInfo(ctx, domainProject, serviceID, schema)
+ if err != nil {
+ return pb.NewError(pb.ErrInternal, err.Error())
+ }
pluginOps = append(pluginOps, opts...)
resp, err := etcdadpt.TxnWithCmp(ctx, pluginOps,
diff --git a/datasource/etcd/schema.go b/datasource/etcd/schema.go
index 186749a..f32c5eb 100644
--- a/datasource/etcd/schema.go
+++ b/datasource/etcd/schema.go
@@ -253,6 +253,12 @@ func (dao *SchemaDAO) PutContent(ctx context.Context,
contentRequest *schema.Put
serviceKey := path.GenerateServiceKey(domainProject, serviceID)
existContentOptions = append(existContentOptions,
etcdadpt.OpPut(etcdadpt.WithStrKey(serviceKey),
etcdadpt.WithValue(body)))
+ syncOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV,
body, sync.WithOpts(map[string]string{"key": serviceKey}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ return err
+ }
+ existContentOptions = append(existContentOptions, syncOpts...)
}
newContentOptions := append(existContentOptions,
etcdadpt.OpPut(etcdadpt.WithStrKey(contentKey),
etcdadpt.WithStrValue(content.Content)))
@@ -389,14 +395,22 @@ func (dao *SchemaDAO) DeleteContent(ctx context.Context,
contentRequest *schema.
log.Error(fmt.Sprintf("schema[%s] is reference by service",
hash), nil)
return discovery.NewError(discovery.ErrInvalidParams, "Schema
has reference.")
}
-
contentKey := path.GenerateServiceSchemaContentKey(domainProject, hash)
- success, err := etcdadpt.Delete(ctx, contentKey)
+ opts := []etcdadpt.OpOptions{
+ etcdadpt.OpDel(etcdadpt.WithStrKey(contentKey)),
+ }
+ delOpts, err := sync.GenDeleteOpts(ctx, datasource.ResourceKV,
contentKey, contentKey, sync.WithOpts(map[string]string{"key": contentKey}))
+ if err != nil {
+ log.Error("fail to create del opts", err)
+ return err
+ }
+ opts = append(opts, delOpts...)
+ resp, err := etcdadpt.TxnWithCmp(ctx, opts,
etcdadpt.If(etcdadpt.ExistKey(contentKey)), nil)
if err != nil {
log.Error(fmt.Sprintf("delete schema content[%s] failed",
hash), err)
return err
}
- if !success {
+ if !resp.Succeeded {
log.Error(fmt.Sprintf("delete schema content[%s] failed",
hash), schema.ErrSchemaContentNotFound)
return schema.ErrSchemaContentNotFound
}
diff --git a/datasource/etcd/schema_test.go b/datasource/etcd/schema_test.go
index c26b947..9106c57 100644
--- a/datasource/etcd/schema_test.go
+++ b/datasource/etcd/schema_test.go
@@ -113,7 +113,8 @@ func TestSyncSchema(t *testing.T) {
}
tasks, err := task.List(schemaContext(), &listTaskReq)
assert.NoError(t, err)
- assert.Equal(t, 3, len(tasks))
+ // append the schemaID into service.Schemas if schemaID
is new will create a kv task
+ assert.Equal(t, 4, len(tasks))
err = task.Delete(schemaContext(), tasks...)
assert.NoError(t, err)
})
diff --git a/datasource/etcd/sync_test.go b/datasource/etcd/sync_test.go
index 2bc8521..1fe4603 100644
--- a/datasource/etcd/sync_test.go
+++ b/datasource/etcd/sync_test.go
@@ -200,7 +200,8 @@ func TestSyncAll(t *testing.T) {
}
tasks, err := task.List(syncAllContext(), &listTaskReq)
assert.NoError(t, err)
- assert.Equal(t, 3, len(tasks))
+ // append the schemaID into service.Schemas if schemaID
is new will create a kv task
+ assert.Equal(t, 4, len(tasks))
err = task.Delete(syncAllContext(), tasks...)
assert.NoError(t, err)
})
diff --git a/datasource/etcd/util.go b/datasource/etcd/util.go
index 2c641e0..25e9fd3 100644
--- a/datasource/etcd/util.go
+++ b/datasource/etcd/util.go
@@ -22,16 +22,18 @@ import (
"fmt"
"strings"
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/go-chassis/cari/pkg/errsvc"
+ "github.com/go-chassis/foundation/gopool"
+ "github.com/little-cui/etcdadpt"
+
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/etcd/path"
"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
serviceUtil
"github.com/apache/servicecomb-service-center/datasource/etcd/util"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
- pb "github.com/go-chassis/cari/discovery"
- "github.com/go-chassis/cari/pkg/errsvc"
- "github.com/go-chassis/foundation/gopool"
- "github.com/little-cui/etcdadpt"
)
type ServiceDetailOpt struct {
@@ -105,15 +107,50 @@ func isExistSchemaSummary(ctx context.Context,
domainProject, serviceID, schemaI
return true, nil
}
-func schemaWithDatabaseOpera(invoke etcdadpt.Operation, domainProject string,
serviceID string, schema *pb.Schema) []etcdadpt.OpOptions {
- pluginOps := make([]etcdadpt.OpOptions, 0)
+func putSchema(ctx context.Context, domainProject string, serviceID string,
schema *pb.Schema) ([]etcdadpt.OpOptions, error) {
+ opts := make([]etcdadpt.OpOptions, 0)
key := path.GenerateServiceSchemaKey(domainProject, serviceID,
schema.SchemaId)
- opt := invoke(etcdadpt.WithStrKey(key),
etcdadpt.WithStrValue(schema.Schema))
- pluginOps = append(pluginOps, opt)
+ onPutOpt := etcdadpt.OpPut(etcdadpt.WithStrKey(key),
etcdadpt.WithStrValue(schema.Schema))
+ opts = append(opts, onPutOpt)
+ syncOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV,
schema.Schema, sync.WithOpts(map[string]string{"key": key}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ return opts, err
+ }
+ opts = append(opts, syncOpts...)
keySummary := path.GenerateServiceSchemaSummaryKey(domainProject,
serviceID, schema.SchemaId)
- opt = invoke(etcdadpt.WithStrKey(keySummary),
etcdadpt.WithStrValue(schema.Summary))
- pluginOps = append(pluginOps, opt)
- return pluginOps
+ onPutOpt = etcdadpt.OpPut(etcdadpt.WithStrKey(keySummary),
etcdadpt.WithStrValue(schema.Summary))
+ opts = append(opts, onPutOpt)
+ syncOpts, err = sync.GenUpdateOpts(ctx, datasource.ResourceKV,
schema.Summary, sync.WithOpts(map[string]string{"key": keySummary}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ return opts, err
+ }
+ opts = append(opts, syncOpts...)
+ return opts, nil
+}
+
+func deleteSchema(ctx context.Context, domainProject string, serviceID string,
schema *pb.Schema) ([]etcdadpt.OpOptions, error) {
+ opts := make([]etcdadpt.OpOptions, 0)
+ key := path.GenerateServiceSchemaKey(domainProject, serviceID,
schema.SchemaId)
+ onDelOpt := etcdadpt.OpDel(etcdadpt.WithStrKey(key),
etcdadpt.WithStrValue(schema.Schema))
+ opts = append(opts, onDelOpt)
+ syncOpts, err := sync.GenDeleteOpts(ctx, datasource.ResourceKV, key,
schema.Schema, sync.WithOpts(map[string]string{"key": key}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ return opts, err
+ }
+ opts = append(opts, syncOpts...)
+ keySummary := path.GenerateServiceSchemaSummaryKey(domainProject,
serviceID, schema.SchemaId)
+ onDelOpt = etcdadpt.OpDel(etcdadpt.WithStrKey(keySummary),
etcdadpt.WithStrValue(schema.Summary))
+ opts = append(opts, onDelOpt)
+ syncOpts, err = sync.GenDeleteOpts(ctx, datasource.ResourceKV,
keySummary, schema.Summary, sync.WithOpts(map[string]string{"key": keySummary}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ return opts, err
+ }
+ opts = append(opts, syncOpts...)
+ return opts, nil
}
func isExistSchemaID(service *pb.MicroService, schemas []*pb.Schema) bool {
@@ -127,13 +164,22 @@ func isExistSchemaID(service *pb.MicroService, schemas
[]*pb.Schema) bool {
return true
}
-func commitSchemaInfo(domainProject string, serviceID string, schema
*pb.Schema) []etcdadpt.OpOptions {
+func commitSchemaInfo(ctx context.Context, domainProject string, serviceID
string, schema *pb.Schema) ([]etcdadpt.OpOptions, error) {
if len(schema.Summary) != 0 {
- return schemaWithDatabaseOpera(etcdadpt.OpPut, domainProject,
serviceID, schema)
+ opts, err := putSchema(ctx, domainProject, serviceID, schema)
+ return opts, err
}
+ opts := make([]etcdadpt.OpOptions, 0)
key := path.GenerateServiceSchemaKey(domainProject, serviceID,
schema.SchemaId)
opt := etcdadpt.OpPut(etcdadpt.WithStrKey(key),
etcdadpt.WithStrValue(schema.Schema))
- return []etcdadpt.OpOptions{opt}
+ opts = append(opts, opt)
+ syncOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV,
schema.Schema, sync.WithOpts(map[string]string{"key": key}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ return opts, err
+ }
+ opts = append(opts, syncOpts...)
+ return opts, nil
}
func getHeartbeatFunc(ctx context.Context, domainProject string,
instancesHbRst chan<- *pb.InstanceHbRst, element *pb.HeartbeatSetElement)
func(context.Context) {
diff --git a/datasource/etcd/util/dependency_util.go
b/datasource/etcd/util/dependency_util.go
index 601bc50..6065831 100644
--- a/datasource/etcd/util/dependency_util.go
+++ b/datasource/etcd/util/dependency_util.go
@@ -24,12 +24,15 @@ import (
"fmt"
"strings"
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/little-cui/etcdadpt"
+
+ "github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/etcd/path"
"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
+ esync
"github.com/apache/servicecomb-service-center/datasource/etcd/sync"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
- pb "github.com/go-chassis/cari/discovery"
- "github.com/little-cui/etcdadpt"
)
func GetConsumerIds(ctx context.Context, domainProject string, provider
*pb.MicroService) ([]string, error) {
@@ -120,11 +123,19 @@ func AddServiceVersionRule(ctx context.Context,
domainProject string, consumer *
id := util.StringJoin([]string{provider.AppId, provider.ServiceName},
"_")
key := path.GenerateConsumerDependencyQueueKey(domainProject,
consumer.ServiceId, id)
- override, err := etcdadpt.InsertBytes(ctx, key, data)
+ opts := make([]etcdadpt.OpOptions, 0)
+ opts = append(opts, etcdadpt.OpPut(etcdadpt.WithStrKey(key),
etcdadpt.WithValue(data)))
+ syncOpts, err := esync.GenUpdateOpts(ctx, datasource.ResourceKV, data,
esync.WithOpts(map[string]string{"key": key}))
+ if err != nil {
+ log.Error("fail to create sync opts", err)
+ return pb.NewError(pb.ErrInternal, err.Error())
+ }
+ opts = append(opts, syncOpts...)
+ resp, err := etcdadpt.Instance().TxnWithCmp(ctx, opts,
etcdadpt.If(etcdadpt.NotExistKey(key)), nil)
if err != nil {
return err
}
- if override {
+ if resp.Succeeded {
log.Info(fmt.Sprintf("put in queue[%s/%s]:
consumer[%s/%s/%s/%s] -> provider[%s/%s/%s]", consumer.ServiceId, id,
consumer.Environment, consumer.AppId,
consumer.ServiceName, consumer.Version,
provider.Environment, provider.AppId,
provider.ServiceName))
diff --git a/datasource/etcd/util/microservice_util.go
b/datasource/etcd/util/microservice_util.go
index 997a6f5..ec596ec 100644
--- a/datasource/etcd/util/microservice_util.go
+++ b/datasource/etcd/util/microservice_util.go
@@ -23,15 +23,17 @@ import (
"fmt"
"strings"
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/little-cui/etcdadpt"
+
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/etcd/path"
"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/config"
- pb "github.com/go-chassis/cari/discovery"
- "github.com/little-cui/etcdadpt"
)
/*
@@ -237,16 +239,23 @@ func GetAllServiceUtil(ctx context.Context)
([]*pb.MicroService, error) {
return services, nil
}
-func UpdateService(domainProject string, serviceID string, service
*pb.MicroService) (opt etcdadpt.OpOptions, err error) {
- opt = etcdadpt.OpOptions{}
+func UpdateService(ctx context.Context, domainProject string, serviceID
string, service *pb.MicroService) ([]etcdadpt.OpOptions, error) {
+ opts := make([]etcdadpt.OpOptions, 0)
key := path.GenerateServiceKey(domainProject, serviceID)
data, err := json.Marshal(service)
if err != nil {
log.Error("marshal service file failed", err)
- return
+ return opts, err
}
- opt = etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data))
- return
+ opt := etcdadpt.OpPut(etcdadpt.WithStrKey(key),
etcdadpt.WithValue(data))
+ opts = append(opts, opt)
+ syncOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, data,
sync.WithOpts(map[string]string{"key": key}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ return opts, err
+ }
+ opts = append(opts, syncOpts...)
+ return opts, nil
}
func GetOneDomainProjectServiceCount(ctx context.Context, domainProject
string) (int64, error) {
diff --git a/server/service/rbac/rbac.go b/server/service/rbac/rbac.go
index 58047b2..7cc2ffd 100644
--- a/server/service/rbac/rbac.go
+++ b/server/service/rbac/rbac.go
@@ -23,14 +23,16 @@ import (
"errors"
"io/ioutil"
- "github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/server/config"
-
"github.com/apache/servicecomb-service-center/server/plugin/security/cipher"
"github.com/go-chassis/cari/pkg/errsvc"
"github.com/go-chassis/cari/rbac"
"github.com/go-chassis/go-archaius"
"github.com/go-chassis/go-chassis/v2/security/authr"
"github.com/go-chassis/go-chassis/v2/security/secret"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/server/config"
+
"github.com/apache/servicecomb-service-center/server/plugin/security/cipher"
+ "github.com/apache/servicecomb-service-center/server/service/sync"
)
const (
@@ -131,7 +133,8 @@ func initFirstTime() {
Roles: []string{rbac.RoleAdmin},
Password: pwd,
}
- err := CreateAccount(context.Background(), a)
+ ctx := sync.SetContext(context.Background())
+ err := CreateAccount(ctx, a)
if err == nil {
log.Info("root account init success")
return