This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 0c9258e [SCB-2094] implement mongo instance/dependence/engine
interface (#759)
0c9258e is described below
commit 0c9258e3761ba1838948c9a48bd3cfd12d8e5798
Author: xzccfzy <[email protected]>
AuthorDate: Wed Dec 2 08:56:21 2020 +0800
[SCB-2094] implement mongo instance/dependence/engine interface (#759)
---
datasource/dependency_util.go | 47 +
datasource/mongo/account.go | 11 +-
datasource/mongo/client/common.go | 1 +
datasource/mongo/client/mongo.go | 12 +-
datasource/mongo/common.go | 34 +
datasource/mongo/database.go | 61 +-
datasource/mongo/dep.go | 340 ++++++-
datasource/mongo/dep_test.go | 351 +++++++
datasource/mongo/engine.go | 213 +++-
datasource/mongo/heartbeat/common.go | 25 +
.../mongo/heartbeat/heartbeatchecker/heartbeat.go | 6 +-
.../heartbeat/heartbeatchecker/heartbeat_test.go | 10 +-
.../heartbeat/heartbeatchecker/heartbeatchecker.go | 5 +-
.../heartbeatchecker/heartbeatchecker_test.go | 2 +-
datasource/mongo/heartbeat/manager.go | 4 +-
datasource/mongo/mongo.go | 4 +-
datasource/mongo/ms.go | 1025 ++++++++++++++++++--
datasource/mongo/ms_test.go | 626 +++++++++---
datasource/mongo/{engine.go => util.go} | 35 +-
19 files changed, 2534 insertions(+), 278 deletions(-)
diff --git a/datasource/dependency_util.go b/datasource/dependency_util.go
new file mode 100644
index 0000000..c907f04
--- /dev/null
+++ b/datasource/dependency_util.go
@@ -0,0 +1,47 @@
+package datasource
+
+import (
+ "github.com/apache/servicecomb-service-center/datasource/etcd/path"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ pb "github.com/go-chassis/cari/discovery"
+)
+
+func ParamsChecker(consumerInfo *pb.MicroServiceKey, providersInfo
[]*pb.MicroServiceKey) *pb.CreateDependenciesResponse {
+ flag := make(map[string]bool, len(providersInfo))
+ for _, providerInfo := range providersInfo {
+ //存在带*的情况,后面的数据就不校验了
+ if providerInfo.ServiceName == "*" {
+ break
+ }
+ if len(providerInfo.AppId) == 0 {
+ providerInfo.AppId = consumerInfo.AppId
+ }
+
+ version := providerInfo.Version
+ if len(version) == 0 {
+ return BadParamsResponse("Required provider version")
+ }
+
+ providerInfo.Version = ""
+ if _, ok := flag[toString(providerInfo)]; ok {
+ return BadParamsResponse("Invalid request body for
provider info.Duplicate provider or (serviceName and appId is same).")
+ }
+ flag[toString(providerInfo)] = true
+ providerInfo.Version = version
+ }
+ return nil
+}
+
+func BadParamsResponse(detailErr string) *pb.CreateDependenciesResponse {
+ log.Errorf(nil, "request params is invalid. %s", detailErr)
+ if len(detailErr) == 0 {
+ detailErr = "Request params is invalid."
+ }
+ return &pb.CreateDependenciesResponse{
+ Response: pb.CreateResponse(pb.ErrInvalidParams, detailErr),
+ }
+}
+
+func toString(in *pb.MicroServiceKey) string {
+ return path.GenerateProviderDependencyRuleKey(in.Tenant, in)
+}
diff --git a/datasource/mongo/account.go b/datasource/mongo/account.go
index feba2af..ff57904 100644
--- a/datasource/mongo/account.go
+++ b/datasource/mongo/account.go
@@ -19,7 +19,6 @@ package mongo
import (
"context"
- "errors"
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/mongo/client"
@@ -34,7 +33,7 @@ import (
func (ds *DataSource) CreateAccount(ctx context.Context, a *rbacframe.Account)
error {
exist, err := ds.AccountExist(ctx, a.Name)
if err != nil {
- log.Errorf(err, "can not save account info")
+ log.Error("can not save account info", err)
return err
}
if exist {
@@ -42,7 +41,7 @@ func (ds *DataSource) CreateAccount(ctx context.Context, a
*rbacframe.Account) e
}
hash, err := bcrypt.GenerateFromPassword([]byte(a.Password), 14)
if err != nil {
- log.Errorf(err, "pwd hash failed")
+ log.Error("pwd hash failed", err)
return err
}
a.Password = stringutil.Bytes2str(hash)
@@ -86,7 +85,7 @@ func (ds *DataSource) GetAccount(ctx context.Context, key
string) (*rbacframe.Ac
var account rbacframe.Account
err = result.Decode(&account)
if err != nil {
- log.Errorf(err, "Decode account failed: ")
+ log.Error("Decode account failed: ", err)
return nil, err
}
return &account, nil
@@ -106,7 +105,7 @@ func (ds *DataSource) ListAccount(ctx context.Context, key
string) ([]*rbacframe
var account rbacframe.Account
err = cursor.Decode(&account)
if err != nil {
- log.Errorf(err, "Decode account failed: ")
+ log.Error("Decode account failed: ", err)
break
}
accounts = append(accounts, &account)
@@ -142,7 +141,7 @@ func (ds *DataSource) UpdateAccount(ctx context.Context,
key string, account *rb
return err
}
if result.ModifiedCount == 0 {
- return errors.New("UpdateAccount: no data to update")
+ return ErrUpdateNodata
}
return nil
}
diff --git a/datasource/mongo/client/common.go
b/datasource/mongo/client/common.go
index a9b7f7d..e71706a 100644
--- a/datasource/mongo/client/common.go
+++ b/datasource/mongo/client/common.go
@@ -21,4 +21,5 @@ import (
var (
ErrCollectionsNil = errors.New("collection is nil")
+ ErrOpenDbFailed = errors.New("open db failed")
)
diff --git a/datasource/mongo/client/mongo.go b/datasource/mongo/client/mongo.go
index eabdb7d..4be189a 100644
--- a/datasource/mongo/client/mongo.go
+++ b/datasource/mongo/client/mongo.go
@@ -17,7 +17,7 @@ package client
import (
"context"
- "errors"
+ "fmt"
"time"
"github.com/apache/servicecomb-service-center/pkg/gopool"
@@ -55,7 +55,7 @@ func GetMongoClient() *MongoClient {
func NewMongoClient(config storage.Options) {
inst := &MongoClient{}
if err := inst.Initialize(config); err != nil {
- log.Errorf(err, "failed to init mongodb")
+ log.Error("failed to init mongodb", err)
inst.err <- err
}
mc = inst
@@ -86,7 +86,7 @@ func (mc *MongoClient) Ready() <-chan struct{} {
func (mc *MongoClient) Close() {
if mc.client != nil {
if err := mc.client.Disconnect(context.TODO()); err != nil {
- log.Errorf(err, "[close mongo client] failed disconnect
the mongo client")
+ log.Error("[close mongo client] failed disconnect the
mongo client", err)
}
}
}
@@ -107,7 +107,7 @@ func (mc *MongoClient) HealthCheck(ctx context.Context) {
if err == nil {
break
}
- log.Errorf(err, "retry to connect to mongodb %s
after %s", mc.dbconfig.URI, MongoCheckDelay)
+ log.Error(fmt.Sprintf("retry to connect to
mongodb %s after %s", mc.dbconfig.URI, MongoCheckDelay), err)
select {
case <-ctx.Done():
mc.Close()
@@ -124,13 +124,13 @@ func (mc *MongoClient) newClient(ctx context.Context)
(err error) {
mc.client, err = mongo.Connect(ctx, clientOptions)
if err != nil {
if derr := mc.client.Disconnect(ctx); derr != nil {
- log.Errorf(derr, "[init mongo client] failed to
disconnect mongo client ")
+ log.Error("[init mongo client] failed to disconnect
mongo clients", derr)
}
return
}
mc.db = mc.client.Database(MongoDB)
if mc.db == nil {
- return errors.New("open db failed")
+ return ErrOpenDbFailed
}
return nil
}
diff --git a/datasource/mongo/common.go b/datasource/mongo/common.go
new file mode 100644
index 0000000..8cab32c
--- /dev/null
+++ b/datasource/mongo/common.go
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except request compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to request writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mongo
+
+import "errors"
+
+var (
+ ErrInvalidConsumer = errors.New("Invalid consumer")
+ ErrUpdateNodata = errors.New("UpdateAccount: no
data to update")
+ ErrServiceFileLost = errors.New("service center
service file lost")
+ ErrInvalidDomainProject = errors.New("invalid
domainProject")
+ ErrNotAllowDeleteSC = errors.New("not allow to
delete service center")
+ ErrDeleteSchemaFailed = errors.New("delete schema
failed")
+ ErrInvalidParamBatchGetInstancesRequest = errors.New("invalid param
BatchGetInstancesRequest")
+)
+
+func NewError(errInfo string, errMsg string) error {
+ return errors.New(errInfo + errMsg)
+}
diff --git a/datasource/mongo/database.go b/datasource/mongo/database.go
index 0dc96f3..3bfdf25 100644
--- a/datasource/mongo/database.go
+++ b/datasource/mongo/database.go
@@ -32,8 +32,6 @@ const (
AccountTokenExpirationTime = "tokenexpirationtime"
AccountCurrentPassword = "currentpassword"
AccountStatus = "status"
- InstanceID = "instanceinfo.instanceid"
- ServiceID = "instanceinfo.serviceid"
RefreshTime = "refreshtime"
)
@@ -43,36 +41,41 @@ const (
CollectionSchema = "schema"
CollectionRule = "rule"
CollectionInstance = "instance"
+ CollectionDep = "dependency"
)
const (
+ DepsQueueUUID = "0"
ErrorDuplicateKey = 11000
)
const (
- Domain = "domain"
- Project = "project"
- ServiceTag = "tags"
- SchemaID = "schemaid"
- RuleServiceID = "serviceid"
- RuleRuleID = "ruleinfo.ruleid"
- SchemaServiceID = "serviceid"
- ServiceServiceID = "serviceinfo.serviceid"
- ServiceProperty = "serviceinfo.properties"
- ServiceModTime = "serviceinfo.modtimestamp"
- ServiceEnv = "serviceinfo.environment"
- ServiceAppID = "serviceinfo.appid"
- ServiceServiceName = "serviceinfo.servicename"
- ServiceAlias = "serviceinfo.alias"
- ServiceVersion = "serviceinfo.version"
- ServiceSchemas = "serviceinfo.schemas"
- RuleAttribute = "ruleinfo.attribute"
- RulePattern = "ruleinfo.pattern"
- RuleModTime = "ruleinfo.modtimestamp"
- RuleDescription = "ruleinfo.description"
- RuleRuletype = "ruleinfo.ruletype"
- SchemaInfo = "schemainfo"
- SchemaSummary = "schemasummary"
+ ColumnDomain = "domain"
+ ColumnProject = "project"
+ ColumnTag = "tags"
+ ColumnSchemaID = "schemaid"
+ ColumnServiceID = "serviceid"
+ ColumnRuleID = "ruleid"
+ ColumnServiceInfo = "serviceinfo"
+ ColumnProperty = "properties"
+ ColumnModTime = "modtimestamp"
+ ColumnEnv = "environment"
+ ColumnAppID = "appid"
+ ColumnServiceName = "servicename"
+ ColumnAlias = "alias"
+ ColumnVersion = "version"
+ ColumnSchemas = "schemas"
+ ColumnAttribute = "attribute"
+ ColumnPattern = "pattern"
+ ColumnDescription = "description"
+ ColumnRuleType = "ruletype"
+ ColumnSchemaInfo = "schemainfo"
+ ColumnSchemaSummary = "schemasummary"
+ ColumnConsumer = "consumer"
+ ColumnDependencyInfo = "dependencyinfo"
+ ColumnRuleInfo = "ruleinfo"
+ ColumnInstanceInfo = "instanceinfo"
+ ColumnInstanceID = "instanceid"
)
type Service struct {
@@ -104,3 +107,11 @@ type Instance struct {
RefreshTime time.Time
InstanceInfo *pb.MicroServiceInstance
}
+
+type Dependency struct {
+ Domain string
+ Project string
+ ConsumerID string
+ UUID string
+ DependencyInfo *pb.ConsumerDependency
+}
diff --git a/datasource/mongo/dep.go b/datasource/mongo/dep.go
index e3322a2..6a1eef2 100644
--- a/datasource/mongo/dep.go
+++ b/datasource/mongo/dep.go
@@ -21,20 +21,354 @@ import (
"context"
pb "github.com/go-chassis/cari/discovery"
+ "go.mongodb.org/mongo-driver/bson"
+
+ "fmt"
+ "strings"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/client"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
)
func (ds *DataSource) SearchProviderDependency(ctx context.Context, request
*pb.GetDependenciesRequest) (*pb.GetProDependenciesResponse, error) {
- return &pb.GetProDependenciesResponse{}, nil
+ providerServiceID := request.ServiceId
+ filter := GeneratorServiceFilter(ctx, providerServiceID)
+ provider, err := GetService(ctx, filter)
+ if err != nil {
+ log.Error("GetProviderDependencies failed, provider is
"+providerServiceID, err)
+ return nil, err
+ }
+ if provider == nil {
+ log.Error(fmt.Sprintf("GetProviderDependencies failed for
provider %s", providerServiceID), err)
+ return &pb.GetProDependenciesResponse{
+ Response: pb.CreateResponse(pb.ErrServiceNotExists,
"Provider does not exist"),
+ }, nil
+ }
+
+ services, err := GetDependencyProviders(ctx, provider.ServiceInfo,
request)
+ if err != nil {
+ log.Error(fmt.Sprintf("GetProviderDependencies failed, provider
is %s/%s/%s/%s",
+ provider.ServiceInfo.Environment,
provider.ServiceInfo.AppId, provider.ServiceInfo.ServiceName,
provider.ServiceInfo.Version), err)
+ return &pb.GetProDependenciesResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+
+ return &pb.GetProDependenciesResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess, "Get all
consumers successful."),
+ Consumers: services,
+ }, nil
}
func (ds *DataSource) SearchConsumerDependency(ctx context.Context, request
*pb.GetDependenciesRequest) (*pb.GetConDependenciesResponse, error) {
- return &pb.GetConDependenciesResponse{}, nil
+ consumerID := request.ServiceId
+
+ filter := GeneratorServiceFilter(ctx, consumerID)
+ consumer, err := GetService(ctx, filter)
+ if err != nil {
+ log.Error(fmt.Sprintf("GetConsumerDependencies failed, consumer
is %s", consumerID), err)
+ return nil, err
+ }
+ if consumer == nil {
+ log.Error(fmt.Sprintf("GetConsumerDependencies failed for
consumer %s does not exist", consumerID), err)
+ return &pb.GetConDependenciesResponse{
+ Response: pb.CreateResponse(pb.ErrServiceNotExists,
"Consumer does not exist"),
+ }, nil
+ }
+
+ services, err := GetDependencyProviders(ctx, consumer.ServiceInfo,
request)
+ if err != nil {
+ log.Error(fmt.Sprintf("GetConsumerDependencies failed, consumer
is %s/%s/%s/%s",
+ consumer.ServiceInfo.Environment,
consumer.ServiceInfo.AppId, consumer.ServiceInfo.ServiceName,
consumer.ServiceInfo.Version), err)
+ return &pb.GetConDependenciesResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+
+ return &pb.GetConDependenciesResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess, "Get all
providers successfully."),
+ Providers: services,
+ }, nil
}
func (ds *DataSource) AddOrUpdateDependencies(ctx context.Context,
dependencyInfos []*pb.ConsumerDependency, override bool) (*pb.Response, error) {
- return pb.CreateResponse(pb.ResponseSuccess, ""), nil
+ domainProject := util.ParseDomainProject(ctx)
+ for _, dependencyInfo := range dependencyInfos {
+ consumerFlag := util.StringJoin([]string{
+ dependencyInfo.Consumer.Environment,
+ dependencyInfo.Consumer.AppId,
+ dependencyInfo.Consumer.ServiceName,
+ dependencyInfo.Consumer.Version}, "/")
+ consumerInfo :=
pb.DependenciesToKeys([]*pb.MicroServiceKey{dependencyInfo.Consumer},
domainProject)[0]
+ providersInfo :=
pb.DependenciesToKeys(dependencyInfo.Providers, domainProject)
+
+ rsp := datasource.ParamsChecker(consumerInfo, providersInfo)
+ if rsp != nil {
+ log.Error(fmt.Sprintf("put request into dependency
queue failed, override: %t consumer is %s %s",
+ override, consumerFlag,
rsp.Response.GetMessage()), nil)
+ return rsp.Response, nil
+ }
+
+ consumerID, err := GetServiceID(ctx, consumerInfo)
+ if err != nil {
+ log.Error(fmt.Sprintf("put request into dependency
queue failed, override: %t, get consumer %s id failed",
+ override, consumerFlag), err)
+ return pb.CreateResponse(pb.ErrInternal, err.Error()),
err
+ }
+ if len(consumerID) == 0 {
+ log.Error(fmt.Sprintf("put request into dependency
queue failed, override: %t consumer %s does not exist",
+ override, consumerFlag), err)
+ return pb.CreateResponse(pb.ErrServiceNotExists,
fmt.Sprintf("Consumer %s does not exist.", consumerFlag)), nil
+ }
+
+ dependencyInfo.Override = override
+ id := DepsQueueUUID
+ if !override {
+ id = util.GenerateUUID()
+ }
+
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+ data := &Dependency{
+ Domain: domain,
+ Project: project,
+ ConsumerID: consumerID,
+ UUID: id,
+ DependencyInfo: dependencyInfo,
+ }
+ insertRes, err := client.GetMongoClient().Insert(ctx,
CollectionDep, data)
+ if err != nil {
+ log.Error("failed to insert dep to mongodb", err)
+ return pb.CreateResponse(pb.ErrInternal, err.Error()),
err
+ }
+ log.Error(fmt.Sprintf("insert dep to mongodb success %s",
insertRes.InsertedID), err)
+ }
+ return pb.CreateResponse(pb.ResponseSuccess, "Create dependency
successfully."), nil
}
func (ds *DataSource) DeleteDependency() {
panic("implement me")
}
+
+func GetDependencyProviders(ctx context.Context, consumer *pb.MicroService,
request *pb.GetDependenciesRequest) ([]*pb.MicroService, error) {
+ keys, err := GetProviderKeys(ctx, consumer)
+ if err != nil {
+ return nil, err
+ }
+
+ services := make([]*pb.MicroService, 0, len(keys))
+
+ for _, key := range keys {
+ domainProject := util.ParseDomainProject(ctx)
+ if request.SameDomain && key.Tenant != domainProject {
+ continue
+ }
+
+ providerIDs, err := ParseDependencyRule(ctx, key)
+ if err != nil {
+ return nil, err
+ }
+
+ if key.ServiceName == "*" {
+ services = services[:0]
+ }
+
+ for _, providerID := range providerIDs {
+ filter := GeneratorServiceFilter(ctx, providerID)
+ provider, err := GetService(ctx, filter)
+ if err != nil {
+ log.Warn(fmt.Sprintf("get provider[%s/%s/%s/%s]
failed",
+ key.Environment, key.AppId,
key.ServiceName, key.Version))
+ continue
+ }
+ if provider == nil {
+ log.Warn(fmt.Sprintf("provider[%s/%s/%s/%s]
does not exist",
+ key.Environment, key.AppId,
key.ServiceName, key.Version))
+ continue
+ }
+ if request.NoSelf && providerID == consumer.ServiceId {
+ continue
+ }
+ services = append(services, provider.ServiceInfo)
+ }
+
+ if key.ServiceName == "*" {
+ break
+ }
+ }
+
+ return services, nil
+}
+
+func GetProviderKeys(ctx context.Context, consumer *pb.MicroService)
([]*pb.MicroServiceKey, error) {
+ if consumer == nil {
+ return nil, ErrInvalidConsumer
+ }
+ domainProject := util.ParseDomainProject(ctx)
+ consumerMicroServiceKey := &pb.MicroServiceKey{
+ Tenant: domainProject,
+ Environment: consumer.Environment,
+ AppId: consumer.AppId,
+ ServiceName: consumer.ServiceName,
+ Alias: consumer.Alias,
+ Version: consumer.Version,
+ }
+
+ filter := GenerateConsumerDependencyRuleKey(ctx,
consumerMicroServiceKey)
+
+ findRes, err := client.GetMongoClient().Find(ctx, CollectionDep, filter)
+ if err != nil {
+ return nil, err
+ }
+ var services []*pb.MicroServiceKey
+ for findRes.Next(ctx) {
+ var tempMongoDep Dependency
+ err := findRes.Decode(&tempMongoDep)
+ if err != nil {
+ return nil, err
+ }
+ providers := tempMongoDep.DependencyInfo.Providers
+ services = append(services, providers...)
+ }
+ return services, nil
+}
+
+func GenerateConsumerDependencyRuleKey(ctx context.Context, in
*pb.MicroServiceKey) bson.M {
+
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+ if in == nil {
+ return bson.M{
+ ColumnDomain: domain,
+ ColumnProject: project,
+ }
+ }
+ if in.ServiceName == "*" {
+ return bson.M{
+ ColumnDomain: domain,
+ ColumnProject: project,
+ StringBuilder([]string{ColumnDependencyInfo,
ColumnConsumer, ColumnEnv}): in.Environment,
+ }
+ }
+ return bson.M{
+ ColumnDomain: domain,
+ ColumnProject: project,
+ StringBuilder([]string{ColumnDependencyInfo, ColumnConsumer,
ColumnEnv}): in.Environment,
+ StringBuilder([]string{ColumnDependencyInfo, ColumnConsumer,
ColumnAppID}): in.AppId,
+ StringBuilder([]string{ColumnDependencyInfo, ColumnConsumer,
ColumnVersion}): in.Version,
+ }
+}
+
+func ParseDependencyRule(ctx context.Context, dependencyRule
*pb.MicroServiceKey) (serviceIDs []string, err error) {
+ switch {
+ case dependencyRule.ServiceName == "*":
+ splited := strings.Split(dependencyRule.Tenant, "/")
+ filter := bson.M{
+ ColumnDomain: splited[0],
+ ColumnProject: splited[1],
+ StringBuilder([]string{ColumnServiceInfo, ColumnEnv}):
dependencyRule.Environment}
+ findRes, err := client.GetMongoClient().Find(ctx,
CollectionService, filter)
+ if err != nil {
+ return nil, err
+ }
+ for findRes.Next(ctx) {
+ var service Service
+ err = findRes.Decode(&service)
+ if err != nil {
+ return nil, err
+ }
+ serviceIDs = append(serviceIDs,
service.ServiceInfo.ServiceId)
+ }
+ default:
+ serviceIDs, err = FindServiceIds(ctx, dependencyRule)
+ }
+ return
+}
+
+func FindServiceIds(ctx context.Context, key *pb.MicroServiceKey) ([]string,
error) {
+ versionRule := key.Version
+ splited := strings.Split(key.Tenant, "/")
+ if len(versionRule) == 0 {
+ return nil, nil
+ }
+ rangeIdx := strings.Index(versionRule, "-")
+ switch {
+ case versionRule == "latest":
+ filter := bson.M{
+ ColumnDomain: splited[0],
+ ColumnProject: splited[1]}
+ return GetFilterVersionService(ctx, filter)
+ case versionRule[len(versionRule)-1:] == "+":
+ start := versionRule[:len(versionRule)-1]
+ filter := bson.M{
+ ColumnDomain: splited[0],
+ ColumnProject: splited[1],
+ StringBuilder([]string{ColumnServiceInfo,
ColumnVersion}): bson.M{"$gte": start}}
+ return GetFilterVersionService(ctx, filter)
+ case rangeIdx > 0:
+ start := versionRule[:rangeIdx]
+ end := versionRule[rangeIdx+1:]
+ filter := bson.M{
+ ColumnDomain: splited[0],
+ ColumnProject: splited[1],
+ StringBuilder([]string{ColumnServiceInfo,
ColumnVersion}): bson.M{"$gts": start, "$lt": end}}
+ return GetFilterVersionService(ctx, filter)
+ default:
+ filter := bson.M{
+ ColumnDomain: splited[0],
+ ColumnProject: splited[1],
+ StringBuilder([]string{ColumnServiceInfo, ColumnEnv}):
key.Environment,
+ StringBuilder([]string{ColumnServiceInfo,
ColumnAppID}): key.AppId,
+ StringBuilder([]string{ColumnServiceInfo,
ColumnServiceName}): key.ServiceName,
+ StringBuilder([]string{ColumnServiceInfo,
ColumnVersion}): key.Version}
+ return GetFilterVersionService(ctx, filter)
+ }
+}
+
+func GetFilterVersionService(ctx context.Context, m bson.M) (serviceIDs
[]string, err error) {
+ findRes, err := client.GetMongoClient().Find(ctx, CollectionService, m)
+ if err != nil {
+ return nil, err
+ }
+ for findRes.Next(ctx) {
+ var service Service
+ err = findRes.Decode(&service)
+ if err != nil {
+ return nil, err
+ }
+ serviceIDs = append(serviceIDs, service.ServiceInfo.ServiceId)
+ }
+ return
+}
+
+func GetServiceID(ctx context.Context, key *pb.MicroServiceKey) (serviceID
string, err error) {
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+ filter := bson.M{
+ ColumnDomain: domain,
+ ColumnProject: project,
+ StringBuilder([]string{ColumnServiceInfo, ColumnEnv}):
key.Environment,
+ StringBuilder([]string{ColumnServiceInfo, ColumnAppID}):
key.AppId,
+ StringBuilder([]string{ColumnServiceInfo, ColumnServiceName}):
key.ServiceName,
+ StringBuilder([]string{ColumnServiceInfo, ColumnVersion}):
key.Version}
+
+ findRes, err := client.GetMongoClient().Find(ctx, CollectionService,
filter)
+ if err != nil {
+ return "", nil
+ }
+ var service []*Service
+ for findRes.Next(ctx) {
+ var temp *Service
+ err := findRes.Decode(&temp)
+ if err != nil {
+ return "", nil
+ }
+ service = append(service, temp)
+ }
+ if service == nil {
+ return "", nil
+ }
+ return service[0].ServiceInfo.ServiceId, nil
+}
diff --git a/datasource/mongo/dep_test.go b/datasource/mongo/dep_test.go
new file mode 100644
index 0000000..77bdc06
--- /dev/null
+++ b/datasource/mongo/dep_test.go
@@ -0,0 +1,351 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package mongo_test
+
+import (
+ "testing"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestDep_Creat(t *testing.T) {
+
+ var (
+ consumerId1 string
+ consumerId3 string
+ )
+
+ t.Run("creat service, when request is valid, should not pass", func(t
*testing.T) {
+ respCreateService, err :=
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: "dep1",
+ AppId: "create_dep_group",
+ ServiceName: "create_dep_consumer",
+ Version: "1.0.0",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateService.Response.GetCode())
+ consumerId1 = respCreateService.ServiceId
+
+ respCreateService, err =
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: "dep2",
+ AppId: "create_dep_group",
+ ServiceName: "create_dep_consumer_all",
+ Version: "1.0.0",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateService.Response.GetCode())
+ consumerId3 = respCreateService.ServiceId
+
+ respCreateService, err =
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: "dep3",
+ Environment: pb.ENV_PROD,
+ AppId: "create_dep_group",
+ ServiceName: "create_dep_consumer",
+ Version: "1.0.0",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateService.Response.GetCode())
+
+ respCreateService, err =
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: "dep4",
+ AppId: "create_dep_group",
+ ServiceName: "create_dep_provider",
+ Version: "1.0.0",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateService.Response.GetCode())
+
+ respCreateService, err =
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: "dep5",
+ AppId: "create_dep_group",
+ ServiceName: "create_dep_provider",
+ Version: "1.0.1",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateService.Response.GetCode())
+
+ respCreateService, err =
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: "dep6",
+ Environment: pb.ENV_PROD,
+ AppId: "create_dep_group",
+ ServiceName: "create_dep_provider",
+ Version: "1.0.0",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateService.Response.GetCode())
+ })
+
+ t.Run("create dep, when request is valid, should be passed", func(t
*testing.T) {
+
+ respCreateDependency, err :=
datasource.Instance().AddOrUpdateDependencies(getContext(),
[]*pb.ConsumerDependency{
+ {
+ Consumer: &pb.MicroServiceKey{
+ ServiceName: "create_dep_consumer",
+ AppId: "create_dep_group",
+ Version: "1.0.0",
+ },
+ Providers: []*pb.MicroServiceKey{
+ {
+ AppId: "create_dep_group",
+ ServiceName:
"create_dep_provider",
+ Version: "latest",
+ },
+ },
+ },
+ }, false)
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateDependency.GetCode())
+
+ respPro, err :=
datasource.Instance().SearchConsumerDependency(getContext(),
&pb.GetDependenciesRequest{
+ ServiceId: consumerId1,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, respPro.Response.GetCode())
+ assert.NotEqual(t, "1.0.1", respPro.Providers[0].Version)
+
+ respCreateDependency, err =
datasource.Instance().AddOrUpdateDependencies(getContext(),
[]*pb.ConsumerDependency{
+ {
+ Consumer: &pb.MicroServiceKey{
+ ServiceName: "create_dep_consumer",
+ AppId: "create_dep_group",
+ Version: "1.0.0",
+ },
+ Providers: []*pb.MicroServiceKey{
+ {
+ AppId: "create_dep_group",
+ ServiceName:
"create_dep_provider",
+ Version: "1.0.0+",
+ },
+ },
+ },
+ }, false)
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateDependency.GetCode())
+
+ respPro, err =
datasource.Instance().SearchConsumerDependency(getContext(),
&pb.GetDependenciesRequest{
+ ServiceId: consumerId1,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, respPro.Response.GetCode())
+ assert.NotEqual(t, "1.0.1", respPro.Providers[0].Version)
+
+ respCreateDependency, err =
datasource.Instance().AddOrUpdateDependencies(getContext(),
[]*pb.ConsumerDependency{
+ {
+ Consumer: &pb.MicroServiceKey{
+ ServiceName: "create_dep_consumer",
+ AppId: "create_dep_group",
+ Version: "1.0.0",
+ },
+ Providers: []*pb.MicroServiceKey{
+ {
+ AppId: "create_dep_group",
+ ServiceName:
"create_dep_provider",
+ Version: "1.0.0+",
+ },
+ },
+ },
+ }, false)
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateDependency.GetCode())
+
+ respPro, err =
datasource.Instance().SearchConsumerDependency(getContext(),
&pb.GetDependenciesRequest{
+ ServiceId: consumerId1,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, respPro.Response.GetCode())
+ assert.NotEqual(t, 2, len(respPro.Providers))
+
+ respCreateDependency, err =
datasource.Instance().AddOrUpdateDependencies(getContext(),
[]*pb.ConsumerDependency{
+ {
+ Consumer: &pb.MicroServiceKey{
+ ServiceName: "create_dep_consumer_all",
+ AppId: "create_dep_group",
+ Version: "1.0.0",
+ },
+ Providers: []*pb.MicroServiceKey{
+ {
+ ServiceName: "*",
+ },
+ },
+ },
+ }, false)
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateDependency.GetCode())
+
+ respPro, err =
datasource.Instance().SearchConsumerDependency(getContext(),
&pb.GetDependenciesRequest{
+ ServiceId: consumerId3,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, respPro.Response.GetCode())
+ assert.NotEqual(t, 0, len(respPro.Providers))
+
+ respCreateDependency, err =
datasource.Instance().AddOrUpdateDependencies(getContext(),
[]*pb.ConsumerDependency{
+ {
+ Consumer: &pb.MicroServiceKey{
+ ServiceName: "create_dep_consumer_all",
+ AppId: "create_dep_group",
+ Version: "1.0.0",
+ },
+ Providers: nil,
+ },
+ }, false)
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateDependency.GetCode())
+
+ respCreateDependency, err =
datasource.Instance().AddOrUpdateDependencies(getContext(),
[]*pb.ConsumerDependency{
+ {
+ Consumer: &pb.MicroServiceKey{
+ ServiceName: "create_dep_consumer",
+ AppId: "create_dep_group",
+ Version: "1.0.0",
+ },
+ Providers: []*pb.MicroServiceKey{
+ {
+ AppId: "create_dep_group",
+ ServiceName:
"create_dep_provider",
+ Version: "1.0.0",
+ },
+ {
+ ServiceName: "*",
+ },
+ },
+ },
+ }, false)
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateDependency.GetCode())
+
+ respCreateDependency, err =
datasource.Instance().AddOrUpdateDependencies(getContext(),
[]*pb.ConsumerDependency{
+ {
+ Consumer: &pb.MicroServiceKey{
+ ServiceName: "create_dep_consumer",
+ AppId: "create_dep_group",
+ Version: "1.0.0",
+ },
+ Providers: []*pb.MicroServiceKey{
+ {
+ AppId: "create_dep_group",
+ ServiceName:
"create_dep_provider",
+ Version: "1.0.0-1.0.1",
+ },
+ },
+ },
+ }, false)
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateDependency.GetCode())
+
+ respPro, err =
datasource.Instance().SearchConsumerDependency(getContext(),
&pb.GetDependenciesRequest{
+ ServiceId: consumerId1,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, respPro.Response.GetCode())
+ assert.Equal(t, "1.0.0", respPro.Providers[0].Version)
+ })
+}
+
+func TestDep_Get(t *testing.T) {
+
+ var (
+ consumerId1 string
+ providerId1 string
+ )
+
+ t.Run("create service, when request is valid, should be passed", func(t
*testing.T) {
+ respCreateService, err :=
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: "dep7",
+ AppId: "get_dep_group",
+ ServiceName: "get_dep_consumer",
+ Version: "1.0.0",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateService.Response.GetCode())
+ consumerId1 = respCreateService.ServiceId
+
+ respCreateService, err =
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: "dep8",
+ AppId: "get_dep_group",
+ ServiceName: "get_dep_provider",
+ Version: "1.0.0",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateService.Response.GetCode())
+ providerId1 = respCreateService.ServiceId
+
+ respCreateService, err =
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: "dep9",
+ AppId: "get_dep_group",
+ ServiceName: "get_dep_provider",
+ Version: "2.0.0",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateService.Response.GetCode())
+ })
+
+ t.Run("execute 'search dep' operation, when request is valid,should be
passed", func(t *testing.T) {
+ respPro, err :=
datasource.Instance().SearchProviderDependency(getContext(),
&pb.GetDependenciesRequest{
+ ServiceId: providerId1,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, respPro.Response.GetCode())
+
+ respCon, err :=
datasource.Instance().SearchConsumerDependency(getContext(),
&pb.GetDependenciesRequest{
+ ServiceId: consumerId1,
+ })
+
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, respCon.Response.GetCode())
+
+ })
+}
diff --git a/datasource/mongo/engine.go b/datasource/mongo/engine.go
index 246ce78..e7b799a 100644
--- a/datasource/mongo/engine.go
+++ b/datasource/mongo/engine.go
@@ -22,17 +22,114 @@ import (
"time"
"github.com/apache/servicecomb-service-center/pkg/cluster"
+
+ "fmt"
+ "strconv"
+ "strings"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/path"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/client"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/core"
+ "github.com/apache/servicecomb-service-center/server/metrics"
+ pb "github.com/go-chassis/cari/discovery"
+ "go.mongodb.org/mongo-driver/bson"
)
func (ds *DataSource) SelfRegister(ctx context.Context) error {
- return nil
+ err := ds.registryService(ctx)
+ if err != nil {
+ return err
+ }
+
+ // 实例信息
+ err = ds.registryInstance(ctx)
+
+ // wait heartbeat
+ ds.autoSelfHeartBeat()
+
+ metrics.ReportScInstance()
+ return err
}
func (ds *DataSource) SelfUnregister(ctx context.Context) error {
+ if len(core.Instance.InstanceId) == 0 {
+ return nil
+ }
+
+ ctx = core.AddDefaultContextValue(ctx)
+ respI, err := datasource.Instance().UnregisterInstance(ctx,
core.UnregisterInstanceRequest())
+ if err != nil {
+ log.Error("unregister failed", err)
+ return err
+ }
+ if respI.Response.GetCode() != pb.ResponseSuccess {
+ err = fmt.Errorf("unregister service center instance[%s/%s]
failed, %s",
+ core.Instance.ServiceId, core.Instance.InstanceId,
respI.Response.GetMessage())
+ log.Error(err.Error(), nil)
+ return err
+ }
+ log.Warn(fmt.Sprintf("unregister service center instance[%s/%s]",
+ core.Service.ServiceId, core.Instance.InstanceId))
return nil
}
// OPS
func (ds *DataSource) ClearNoInstanceServices(ctx context.Context, ttl
time.Duration) error {
+ services, err := GetAllServicesAcrossDomainProject(ctx)
+ if err != nil {
+ return err
+ }
+ if len(services) == 0 {
+ log.Info("no service found, no need to clear")
+ return nil
+ }
+
+ timeLimit := time.Now().Add(0 - ttl)
+ log.Info(fmt.Sprintf("clear no-instance services created before %s",
timeLimit))
+ timeLimitStamp := strconv.FormatInt(timeLimit.Unix(), 10)
+
+ for domainProject, svcList := range services {
+ if len(svcList) == 0 {
+ continue
+ }
+ ctx, err := ctxFromDomainProject(ctx, domainProject)
+ if err != nil {
+ log.Error("get domain project context failed", err)
+ continue
+ }
+ for _, svc := range svcList {
+ if svc == nil {
+ continue
+ }
+ ok, err := shouldClear(ctx, timeLimitStamp, svc)
+ if err != nil {
+ log.Error("check service clear necessity
failed", err)
+ continue
+ }
+ if !ok {
+ continue
+ }
+ svcCtxStr := "domainProject: " + domainProject + ", " +
+ "env: " + svc.Environment + ", " +
+ "service: " +
util.StringJoin([]string{svc.AppId, svc.ServiceName, svc.Version}, path.SPLIT)
+ delSvcReq := &pb.DeleteServiceRequest{
+ ServiceId: svc.ServiceId,
+ Force: true, //force delete
+ }
+ delSvcResp, err :=
datasource.Instance().UnregisterService(ctx, delSvcReq)
+ if err != nil {
+ log.Error(fmt.Sprintf("clear service failed,
%s", svcCtxStr), err)
+ continue
+ }
+ if delSvcResp.Response.GetCode() != pb.ResponseSuccess {
+ log.Error(fmt.Sprintf("clear service failed %s
%s", delSvcResp.Response.GetMessage(), svcCtxStr), err)
+ continue
+ }
+ log.Warn(fmt.Sprintf("clear service success, %s",
svcCtxStr))
+ }
+ }
return nil
}
@@ -43,3 +140,117 @@ func (ds *DataSource) UpgradeVersion(ctx context.Context)
error {
func (ds *DataSource) GetClusters(ctx context.Context) (cluster.Clusters,
error) {
return nil, nil
}
+
+func (ds *DataSource) registryService(pCtx context.Context) error {
+ ctx := core.AddDefaultContextValue(pCtx)
+ respE, err := datasource.Instance().ExistService(ctx,
core.GetExistenceRequest())
+ if err != nil {
+ log.Error("query service center existence failed", err)
+ return err
+ }
+ if respE.Response.GetCode() == pb.ResponseSuccess {
+ log.Warn(fmt.Sprintf("service center service[%s] already
registered", respE.ServiceId))
+ respG, err := datasource.Instance().GetService(ctx,
core.GetServiceRequest(respE.ServiceId))
+ if respG.Response.GetCode() != pb.ResponseSuccess {
+ log.Error(fmt.Sprintf("query service center service[%s]
info failed", respE.ServiceId), err)
+ return ErrServiceFileLost
+ }
+ core.Service = respG.Service
+ return nil
+ }
+
+ respS, err := datasource.Instance().RegisterService(ctx,
core.CreateServiceRequest())
+ if err != nil {
+ log.Error("register service center failed", err)
+ return err
+ }
+ core.Service.ServiceId = respS.ServiceId
+ log.Info(fmt.Sprintf("register service center service[%s]",
respS.ServiceId))
+ return nil
+}
+
+func (ds *DataSource) registryInstance(pCtx context.Context) error {
+ core.Instance.InstanceId = ""
+ core.Instance.ServiceId = core.Service.ServiceId
+
+ ctx := core.AddDefaultContextValue(pCtx)
+
+ respI, err := datasource.Instance().RegisterInstance(ctx,
core.RegisterInstanceRequest())
+ if err != nil {
+ log.Error("register failed", err)
+ return err
+ }
+ if respI.Response.GetCode() != pb.ResponseSuccess {
+ err = fmt.Errorf("register service center[%s] instance failed,
%s",
+ core.Instance.ServiceId, respI.Response.GetMessage())
+ log.Error(err.Error(), nil)
+ return err
+ }
+ core.Instance.InstanceId = respI.InstanceId
+ log.Info(fmt.Sprintf("register service center instance[%s/%s],
endpoints is %s",
+ core.Service.ServiceId, respI.InstanceId,
core.Instance.Endpoints))
+ return nil
+}
+
+func (ds *DataSource) autoSelfHeartBeat() {
+ //todo
+}
+
+func GetAllServicesAcrossDomainProject(ctx context.Context)
(map[string][]*pb.MicroService, error) {
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+
+ filter := bson.M{"domain": domain, "project": project}
+
+ findRes, err := client.GetMongoClient().Find(ctx, CollectionService,
filter)
+ if err != nil {
+ return nil, err
+ }
+
+ services := make(map[string][]*pb.MicroService)
+
+ for findRes.Next(ctx) {
+ var mongoService Service
+ err := findRes.Decode(&mongoService)
+ if err != nil {
+ return nil, err
+ }
+ domainProject := mongoService.Domain + "/" +
mongoService.Project
+ services[domainProject] = append(services[domainProject],
mongoService.ServiceInfo)
+ }
+ return services, nil
+}
+
+func ctxFromDomainProject(pCtx context.Context, domainProject string) (ctx
context.Context, err error) {
+ splitIndex := strings.Index(domainProject, path.SPLIT)
+ if splitIndex == -1 {
+ return nil, NewError("invalid domainProject: ", domainProject)
+ }
+ domain := domainProject[:splitIndex]
+ project := domainProject[splitIndex+1:]
+ return util.SetDomainProject(pCtx, domain, project), nil
+}
+
+func shouldClear(ctx context.Context, timeLimitStamp string, svc
*pb.MicroService) (bool, error) {
+ if svc.Timestamp > timeLimitStamp {
+ return false, nil
+ }
+
+ getInstsReq := &pb.GetInstancesRequest{
+ ConsumerServiceId: svc.ServiceId,
+ ProviderServiceId: svc.ServiceId,
+ }
+
+ getInstsResp, err := datasource.Instance().GetInstances(ctx,
getInstsReq)
+ if err != nil {
+ return false, err
+ }
+ if getInstsResp.Response.GetCode() != pb.ResponseSuccess {
+ return false, NewError("get instance failed: ",
getInstsResp.Response.GetMessage())
+ }
+ //ignore a service if it has instances
+ if len(getInstsResp.Instances) > 0 {
+ return false, nil
+ }
+ return true, nil
+}
diff --git a/datasource/mongo/heartbeat/common.go
b/datasource/mongo/heartbeat/common.go
new file mode 100644
index 0000000..9d5cb31
--- /dev/null
+++ b/datasource/mongo/heartbeat/common.go
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except request compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to request writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package heartbeat
+
+import "errors"
+
+var (
+ ErrPluginNameNil = errors.New("plugin implement name is nil")
+ ErrPluginNotSupport = errors.New("plugin implement not supported
[#{opts.PluginImplName}]")
+)
diff --git a/datasource/mongo/heartbeat/heartbeatchecker/heartbeat.go
b/datasource/mongo/heartbeat/heartbeatchecker/heartbeat.go
index f87d08c..57e50f0 100644
--- a/datasource/mongo/heartbeat/heartbeatchecker/heartbeat.go
+++ b/datasource/mongo/heartbeat/heartbeatchecker/heartbeat.go
@@ -29,15 +29,15 @@ import (
func updateInstanceRefreshTime(ctx context.Context, serviceID string,
instanceID string) error {
filter := bson.M{
- mongo.InstanceID: instanceID,
- mongo.ServiceID: serviceID,
+ mongo.StringBuilder([]string{mongo.ColumnInstanceInfo,
mongo.ColumnInstanceID}): instanceID,
+ mongo.StringBuilder([]string{mongo.ColumnInstanceInfo,
mongo.ColumnServiceID}): serviceID,
}
update := bson.M{
"$set": bson.M{mongo.RefreshTime: time.Now()},
}
result, err := client.GetMongoClient().FindOneAndUpdate(ctx,
mongo.CollectionInstance, filter, update)
if err != nil {
- log.Errorf(err, "failed to update refresh time of instance: ")
+ log.Error("failed to update refresh time of instance: ", err)
return err
}
if result.Err() != nil {
diff --git a/datasource/mongo/heartbeat/heartbeatchecker/heartbeat_test.go
b/datasource/mongo/heartbeat/heartbeatchecker/heartbeat_test.go
index 5ef1bc9..f77cd8e 100644
--- a/datasource/mongo/heartbeat/heartbeatchecker/heartbeat_test.go
+++ b/datasource/mongo/heartbeat/heartbeatchecker/heartbeat_test.go
@@ -19,12 +19,12 @@ package heartbeatchecker
import (
"context"
- "fmt"
"testing"
"time"
"github.com/apache/servicecomb-service-center/datasource/mongo"
"github.com/apache/servicecomb-service-center/datasource/mongo/client"
+ "github.com/apache/servicecomb-service-center/pkg/log"
pb "github.com/go-chassis/cari/discovery"
"github.com/go-chassis/go-chassis/v2/storage"
"github.com/stretchr/testify/assert"
@@ -41,7 +41,7 @@ func init() {
func TestUpdateInstanceRefreshTime(t *testing.T) {
t.Run("update instance refresh time: if the instance does not exist,the
update should fail", func(t *testing.T) {
err := updateInstanceRefreshTime(context.Background(),
"not-exist", "not-exist")
- fmt.Println(err)
+ log.Error("", err)
assert.NotNil(t, err)
})
@@ -58,8 +58,8 @@ func TestUpdateInstanceRefreshTime(t *testing.T) {
err = updateInstanceRefreshTime(context.Background(),
instance1.InstanceInfo.ServiceId, instance1.InstanceInfo.InstanceId)
assert.Equal(t, nil, err)
filter := bson.M{
- mongo.InstanceID: instance1.InstanceInfo.InstanceId,
- mongo.ServiceID: instance1.InstanceInfo.ServiceId,
+ mongo.StringBuilder([]string{mongo.ColumnInstanceInfo,
mongo.ColumnInstanceID}): instance1.InstanceInfo.InstanceId,
+ mongo.StringBuilder([]string{mongo.ColumnInstanceInfo,
mongo.ColumnServiceID}): instance1.InstanceInfo.ServiceId,
}
result, err :=
client.GetMongoClient().FindOne(context.Background(), mongo.CollectionInstance,
filter)
assert.Nil(t, err)
@@ -68,7 +68,7 @@ func TestUpdateInstanceRefreshTime(t *testing.T) {
assert.Nil(t, err)
assert.NotEqual(t, instance1.RefreshTime, ins.RefreshTime)
filter = bson.M{
- mongo.InstanceID: instance1.InstanceInfo.InstanceId,
+ mongo.StringBuilder([]string{mongo.ColumnInstanceInfo,
mongo.ColumnInstanceID}): instance1.InstanceInfo.InstanceId,
}
_, err = client.GetMongoClient().Delete(context.Background(),
mongo.CollectionInstance, filter)
assert.Nil(t, err)
diff --git a/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker.go
b/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker.go
index 822ed82..fad3ec0 100644
--- a/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker.go
+++ b/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker.go
@@ -24,6 +24,8 @@ import (
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
pb "github.com/go-chassis/cari/discovery"
+
+ "fmt"
)
func init() {
@@ -41,8 +43,7 @@ func (h *HeartBeatChecker) Heartbeat(ctx context.Context,
request *pb.HeartbeatR
remoteIP := util.GetIPFromContext(ctx)
err := updateInstanceRefreshTime(ctx, request.ServiceId,
request.InstanceId)
if err != nil {
- log.Errorf(err, "heartbeat failed, instance[%s]. operator %s",
- request.InstanceId, remoteIP)
+ log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator
%s", request.InstanceId, remoteIP), err)
resp := &pb.HeartbeatResponse{
Response:
pb.CreateResponseWithSCErr(pb.NewError(pb.ErrInstanceNotExists, err.Error())),
}
diff --git
a/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker_test.go
b/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker_test.go
index a519240..ffe82b5 100644
--- a/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker_test.go
+++ b/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker_test.go
@@ -58,7 +58,7 @@ func TestHeartbeat(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
filter := bson.M{
- mongo.InstanceID: instance1.InstanceInfo.InstanceId,
+ mongo.StringBuilder([]string{mongo.ColumnInstanceInfo,
mongo.ColumnInstanceID}): instance1.InstanceInfo.InstanceId,
}
_, err = client.GetMongoClient().Delete(context.Background(),
mongo.CollectionInstance, filter)
assert.Nil(t, err)
diff --git a/datasource/mongo/heartbeat/manager.go
b/datasource/mongo/heartbeat/manager.go
index 4a1e46f..279e72a 100644
--- a/datasource/mongo/heartbeat/manager.go
+++ b/datasource/mongo/heartbeat/manager.go
@@ -48,11 +48,11 @@ func Init(opts Options) error {
func New(opts Options) (HealthCheck, error) {
if opts.PluginImplName == "" {
- return nil, fmt.Errorf("plugin implement name is nil")
+ return nil, ErrPluginNameNil
}
f, ok := plugins[opts.PluginImplName]
if !ok {
- return nil, fmt.Errorf("plugin implement not supported
[#{opts.PluginImplName}]")
+ return nil, ErrPluginNotSupport
}
return f(opts)
}
diff --git a/datasource/mongo/mongo.go b/datasource/mongo/mongo.go
index 7d93769..6e96711 100644
--- a/datasource/mongo/mongo.go
+++ b/datasource/mongo/mongo.go
@@ -39,7 +39,7 @@ type DataSource struct {
func NewDataSource(opts datasource.Options) (datasource.DataSource, error) {
// TODO: construct a reasonable DataSource instance
- log.Warnf("dependency data source enable etcd mode")
+ log.Warn("dependency data source enable etcd mode")
inst := &DataSource{
SchemaEditable: opts.SchemaEditable,
@@ -71,7 +71,7 @@ func (ds *DataSource) initPlugins() error {
kind := config.GetString("registry.heartbeat.kind", "heartbeatchecker",
config.WithStandby("heartbeat_plugin"))
err := heartbeat.Init(heartbeat.Options{PluginImplName:
heartbeat.ImplName(kind)})
if err != nil {
- log.Fatalf(err, "heartbeat init failed")
+ log.Fatal("heartbeat init failed", err)
return err
}
return nil
diff --git a/datasource/mongo/ms.go b/datasource/mongo/ms.go
index f1befa3..63204aa 100644
--- a/datasource/mongo/ms.go
+++ b/datasource/mongo/ms.go
@@ -19,7 +19,6 @@ package mongo
import (
"context"
- "errors"
"fmt"
"strconv"
"time"
@@ -27,6 +26,7 @@ import (
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/mongo/client"
"github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat"
+ "github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
apt "github.com/apache/servicecomb-service-center/server/core"
@@ -86,7 +86,8 @@ func (ds *DataSource) RegisterService(ctx context.Context,
request *pb.CreateSer
}
remoteIP := util.GetIPFromContext(ctx)
- log.Infof("create micro-service[%s][%s] successfully,operator: %s",
service.ServiceId, insertRes.InsertedID, remoteIP)
+ log.Info(fmt.Sprintf("create micro-service[%s][%s]
successfully,operator: %s",
+ service.ServiceId, insertRes.InsertedID, remoteIP))
return &pb.CreateServiceResponse{
Response: pb.CreateResponse(pb.ResponseSuccess, "Register
service successfully"),
@@ -100,7 +101,7 @@ func (ds *DataSource) GetServices(ctx context.Context,
request *pb.GetServicesRe
domain := util.ParseDomain(ctx)
project := util.ParseProject(ctx)
- filter := bson.M{Domain: domain, Project: project}
+ filter := bson.M{ColumnDomain: domain, ColumnProject: project}
services, err := GetServices(ctx, filter)
if err != nil {
@@ -119,7 +120,10 @@ func (ds *DataSource) GetApplications(ctx context.Context,
request *pb.GetAppsRe
domain := util.ParseDomain(ctx)
project := util.ParseProject(ctx)
- filter := bson.M{Domain: domain, Project: project, ServiceEnv:
request.Environment}
+ filter := bson.M{
+ ColumnDomain: domain,
+ ColumnProject: project,
+ StringBuilder([]string{ColumnServiceInfo, ColumnEnv}):
request.Environment}
services, err := GetServices(ctx, filter)
if err != nil {
@@ -155,7 +159,7 @@ func (ds *DataSource) GetService(ctx context.Context,
request *pb.GetServiceRequ
*pb.GetServiceResponse, error) {
svc, err := GetService(ctx, GeneratorServiceFilter(ctx,
request.ServiceId))
if err != nil {
- log.Errorf(err, "failed to get single service [%s] from mongo",
request.ServiceId)
+ log.Error(fmt.Sprintf("failed to get single service %s from
mongo", request.ServiceId), err)
return &pb.GetServiceResponse{
Response: pb.CreateResponse(pb.ErrInternal, "get
service data from mongodb failed."),
}, err
@@ -281,24 +285,23 @@ func DelServicePri(ctx context.Context, serviceID string,
force bool) (*pb.Respo
}
if serviceID == apt.Service.ServiceId {
- err := errors.New("not allow to delete service center")
- log.Errorf(err, "%s micro-service[%s] failed, operator: %s",
title, serviceID, remoteIP)
- return pb.CreateResponse(pb.ErrInvalidParams, err.Error()), nil
+ log.Error(fmt.Sprintf("%s micro-service %s failed, operator:
%s", title, serviceID, remoteIP), ErrNotAllowDeleteSC)
+ return pb.CreateResponse(pb.ErrInvalidParams,
ErrNotAllowDeleteSC.Error()), nil
}
microservice, err := GetService(ctx, GeneratorServiceFilter(ctx,
serviceID))
if err != nil {
- log.Errorf(err, "%s micro-service[%s] failed, get service file
failed, operator: %s",
- title, serviceID, remoteIP)
+ log.Error(fmt.Sprintf("%s micro-service %s failed, get service
file failed, operator: %s",
+ title, serviceID, remoteIP), err)
return pb.CreateResponse(pb.ErrInternal, err.Error()), err
}
if microservice == nil {
- log.Errorf(err, "%s micro-service[%s] failed, service does not
exist, operator: %s",
- title, serviceID, remoteIP)
+ log.Error(fmt.Sprintf("%s micro-service %s failed, service does
not exist, operator: %s",
+ title, serviceID, remoteIP), err)
return pb.CreateResponse(pb.ErrServiceNotExists, "Service does
not exist."), nil
}
// 强制删除,则与该服务相关的信息删除,非强制删除:
如果作为该被依赖(作为provider,提供服务,且不是只存在自依赖)或者存在实例,则不能删除
if !force {
- log.Infof("force delete,should del instance...")
+ log.Info("force delete,should del instance...")
//todo wait for dep interface
}
filter := GeneratorServiceFilter(ctx, serviceID)
@@ -329,9 +332,13 @@ func (ds *DataSource) UpdateService(ctx context.Context,
request *pb.UpdateServi
}, nil
}
- err = UpdateService(ctx, GeneratorServiceFilter(ctx,
request.ServiceId), bson.M{"$set": bson.M{ServiceModTime:
strconv.FormatInt(time.Now().Unix(), 10), ServiceProperty: request.Properties}})
+ updateData := bson.M{
+ "$set": bson.M{
+ StringBuilder([]string{ColumnServiceInfo,
ColumnModTime}): strconv.FormatInt(time.Now().Unix(), 10),
+ StringBuilder([]string{ColumnServiceInfo,
ColumnProperty}): request.Properties}}
+ err = UpdateService(ctx, GeneratorServiceFilter(ctx,
request.ServiceId), updateData)
if err != nil {
- log.Errorf(err, "update service [%s] properties failed, update
mongo failed", request.ServiceId)
+ log.Error(fmt.Sprintf("update service %s properties failed,
update mongo failed", request.ServiceId), err)
return &pb.UpdateServicePropsResponse{
Response: pb.CreateResponse(pb.ErrUnavailableBackend,
"Update doc in mongo failed."),
}, nil
@@ -362,8 +369,7 @@ func (ds *DataSource) GetServiceDetail(ctx context.Context,
request *pb.GetServi
svc := mgSvc.ServiceInfo
versions, err := GetServicesVersions(ctx, bson.M{})
if err != nil {
- log.Errorf(err, "get service[%s/%s/%s] all versions failed",
- svc.Environment, svc.AppId, svc.ServiceName)
+ log.Error(fmt.Sprintf("get service %s %s %s all versions
failed", svc.Environment, svc.AppId, svc.ServiceName), err)
return &pb.GetServiceDetailResponse{
Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
}, err
@@ -403,7 +409,7 @@ func (ds *DataSource) GetServicesInfo(ctx context.Context,
request *pb.GetServic
//todo add get statistics info
services, err := GetMongoServices(ctx, bson.M{})
if err != nil {
- log.Errorf(err, "get all services by domain failed")
+ log.Error("get all services by domain failed", err)
return &pb.GetServicesInfoResponse{
Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
}, err
@@ -443,7 +449,7 @@ func (ds *DataSource) GetServicesInfo(ctx context.Context,
request *pb.GetServic
func (ds *DataSource) AddTags(ctx context.Context, request
*pb.AddServiceTagsRequest) (*pb.AddServiceTagsResponse, error) {
service, err := GetService(ctx, GeneratorServiceFilter(ctx,
request.ServiceId))
if err != nil {
- log.Errorf(err, "failed to add tags for service [%s] for get
service failed,", request.ServiceId)
+ log.Error(fmt.Sprintf("failed to add tags for service %s for
get service failed", request.ServiceId), err)
return &pb.AddServiceTagsResponse{
Response: pb.CreateResponse(pb.ErrInternal, "Failed to
check service exist"),
}, nil
@@ -460,9 +466,9 @@ func (ds *DataSource) AddTags(ctx context.Context, request
*pb.AddServiceTagsReq
}
tags[key] = value
}
- err = UpdateService(ctx, GeneratorServiceFilter(ctx,
request.ServiceId), bson.M{"$set": bson.M{ServiceTag: tags}})
+ err = UpdateService(ctx, GeneratorServiceFilter(ctx,
request.ServiceId), bson.M{"$set": bson.M{ColumnTag: tags}})
if err != nil {
- log.Errorf(err, "update service [%s] tags failed.",
request.ServiceId)
+ log.Error(fmt.Sprintf("update service %s tags failed.",
request.ServiceId), err)
return &pb.AddServiceTagsResponse{
Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
}, nil
@@ -475,7 +481,7 @@ func (ds *DataSource) AddTags(ctx context.Context, request
*pb.AddServiceTagsReq
func (ds *DataSource) GetTags(ctx context.Context, request
*pb.GetServiceTagsRequest) (*pb.GetServiceTagsResponse, error) {
svc, err := GetService(ctx, GeneratorServiceFilter(ctx,
request.ServiceId))
if err != nil {
- log.Errorf(err, "failed to get service [%s] tags",
request.ServiceId)
+ log.Error(fmt.Sprintf("failed to get service %s tags",
request.ServiceId), err)
return &pb.GetServiceTagsResponse{
Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
}, nil
@@ -494,7 +500,7 @@ func (ds *DataSource) GetTags(ctx context.Context, request
*pb.GetServiceTagsReq
func (ds *DataSource) UpdateTag(ctx context.Context, request
*pb.UpdateServiceTagRequest) (*pb.UpdateServiceTagResponse, error) {
svc, err := GetService(ctx, GeneratorServiceFilter(ctx,
request.ServiceId))
if err != nil {
- log.Errorf(err, "failed to get service [%s] tags",
request.ServiceId)
+ log.Error(fmt.Sprintf("failed to get %s tags",
request.ServiceId), err)
return &pb.UpdateServiceTagResponse{
Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
}, nil
@@ -518,9 +524,9 @@ func (ds *DataSource) UpdateTag(ctx context.Context,
request *pb.UpdateServiceTa
}
newTags[request.Key] = request.Value
- err = UpdateService(ctx, GeneratorServiceFilter(ctx,
request.ServiceId), bson.M{"$set": bson.M{ServiceTag: newTags}})
+ err = UpdateService(ctx, GeneratorServiceFilter(ctx,
request.ServiceId), bson.M{"$set": bson.M{ColumnTag: newTags}})
if err != nil {
- log.Errorf(err, "update service [%s] tags failed.",
request.ServiceId)
+ log.Error(fmt.Sprintf("update service %s tags failed",
request.ServiceId), err)
return &pb.UpdateServiceTagResponse{
Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
}, nil
@@ -533,7 +539,7 @@ func (ds *DataSource) UpdateTag(ctx context.Context,
request *pb.UpdateServiceTa
func (ds *DataSource) DeleteTags(ctx context.Context, request
*pb.DeleteServiceTagsRequest) (*pb.DeleteServiceTagsResponse, error) {
svc, err := GetService(ctx, GeneratorServiceFilter(ctx,
request.ServiceId))
if err != nil {
- log.Errorf(err, "failed to get service [%s] tags",
request.ServiceId)
+ log.Error(fmt.Sprintf("failed to get service %s tags",
request.ServiceId), err)
return &pb.DeleteServiceTagsResponse{
Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
}, nil
@@ -558,9 +564,9 @@ func (ds *DataSource) DeleteTags(ctx context.Context,
request *pb.DeleteServiceT
delete(newTags, key)
}
}
- err = UpdateService(ctx, GeneratorServiceFilter(ctx,
request.ServiceId), bson.M{"$set": bson.M{ServiceTag: newTags}})
+ err = UpdateService(ctx, GeneratorServiceFilter(ctx,
request.ServiceId), bson.M{"$set": bson.M{ColumnTag: newTags}})
if err != nil {
- log.Errorf(err, "delete service [%s] tags failed.",
request.ServiceId)
+ log.Error(fmt.Sprintf("delete service %s tags failed",
request.ServiceId), err)
return &pb.DeleteServiceTagsResponse{
Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
}, nil
@@ -698,7 +704,7 @@ func (ds *DataSource) ModifySchema(ctx context.Context,
request *pb.ModifySchema
defer session.EndSession(ctx)
err = ds.modifySchema(ctx, request.ServiceId, &schema)
if err != nil {
- log.Errorf(err, "modify schema[%s/%s] failed, operator: %s",
serviceID, schemaID, remoteIP)
+ log.Error(fmt.Sprintf("modify schema %s %s failed, operator
%s", serviceID, schemaID, remoteIP), err)
errAbort := session.AbortTransaction(ctx)
if errAbort != nil {
return &pb.ModifySchemaResponse{
@@ -715,7 +721,7 @@ func (ds *DataSource) ModifySchema(ctx context.Context,
request *pb.ModifySchema
Response: pb.CreateResponse(pb.ErrInternal, "Txn
ModifySchema CommitTransaction failed."),
}, err
}
- log.Infof("modify schema[%s/%s] successfully, operator: %s", serviceID,
schemaID, remoteIP)
+ log.Info(fmt.Sprintf("modify schema[%s/%s] successfully, operator: %s",
serviceID, schemaID, remoteIP))
return &pb.ModifySchemaResponse{
Response: pb.CreateResponse(pb.ResponseSuccess, "modify schema
info success."),
}, nil
@@ -791,13 +797,13 @@ func (ds *DataSource) modifySchema(ctx context.Context,
serviceID string, schema
}
if schema != nil {
if len(schema.Summary) == 0 {
- log.Errorf(err, "modify schema[%s/%s] failed,
get schema summary failed, operator: %s",
- serviceID, schema.SchemaId, remoteIP)
+ log.Error(fmt.Sprintf("modify schema %s %s
failed, get schema summary failed, operator: %s",
+ serviceID, schema.SchemaId, remoteIP),
err)
return pb.NewError(pb.ErrUnavailableBackend,
err.Error())
}
if len(respSchema.SchemaSummary) != 0 {
- log.Errorf(err, "%s mode, schema[%s/%s] already
exist, can not be changed, operator: %s",
- pb.ENV_PROD, serviceID,
schema.SchemaId, remoteIP)
+ log.Error(fmt.Sprintf("mode, schema %s %s
already exist, can not be changed, operator: %s",
+ serviceID, schema.SchemaId, remoteIP),
err)
return pb.NewError(pb.ErrModifySchemaNotAllow,
"schema already exist, can not be changed request "+pb.ENV_PROD)
}
}
@@ -812,12 +818,14 @@ func (ds *DataSource) modifySchema(ctx context.Context,
serviceID string, schema
}
}
if len(newSchemas) != len(microservice.Schemas) {
- err := UpdateService(ctx, GeneratorServiceFilter(ctx,
serviceID), bson.M{"$set": bson.M{ServiceSchemas: newSchemas}})
+
+ updateData := bson.M{StringBuilder([]string{ColumnServiceInfo,
ColumnSchemas}): newSchemas}
+ err := UpdateService(ctx, GeneratorServiceFilter(ctx,
serviceID), bson.M{"$set": updateData})
if err != nil {
return pb.NewError(pb.ErrInternal, err.Error())
}
}
- newSchema := bson.M{"$set": bson.M{SchemaInfo: schema.Schema,
SchemaSummary: schema.Summary}}
+ newSchema := bson.M{"$set": bson.M{ColumnSchemaInfo: schema.Schema,
ColumnSchemaSummary: schema.Summary}}
err = UpdateSchema(ctx, GeneratorSchemaFilter(ctx, serviceID,
schema.SchemaId), newSchema, options.FindOneAndUpdate().SetUpsert(true))
if err != nil {
return pb.NewError(pb.ErrInternal, err.Error())
@@ -830,8 +838,7 @@ func (ds *DataSource) modifySchemas(ctx context.Context,
service *pb.MicroServic
serviceID := service.ServiceId
schemasFromDatabase, err := GetSchemas(ctx, GeneratorServiceFilter(ctx,
serviceID))
if err != nil {
- log.Errorf(nil, "modify service[%s] schemas failed, get schemas
failed, operator: %s",
- serviceID, remoteIP)
+ log.Error(fmt.Sprintf("modify service %s schemas failed, get
schemas failed, operator: %s", serviceID, remoteIP), err)
return pb.NewError(pb.ErrUnavailableBackend, err.Error())
}
needUpdateSchemas, needAddSchemas, needDeleteSchemas, nonExistSchemaIds
:=
@@ -839,16 +846,17 @@ func (ds *DataSource) modifySchemas(ctx context.Context,
service *pb.MicroServic
if !ds.isSchemaEditable(service) {
if len(service.Schemas) == 0 {
//todo add quota check
- err := UpdateService(ctx, GeneratorServiceFilter(ctx,
serviceID), bson.M{"$set": bson.M{ServiceSchemas: nonExistSchemaIds}})
+ updateData :=
bson.M{StringBuilder([]string{ColumnServiceInfo, ColumnSchemas}):
nonExistSchemaIds}
+ err := UpdateService(ctx, GeneratorServiceFilter(ctx,
serviceID), bson.M{"$set": updateData})
if err != nil {
- log.Errorf(err, "modify service[%s] schemas
failed, update service.Schemas failed, operator: %s",
- serviceID, remoteIP)
+ log.Error(fmt.Sprintf("modify service %s
schemas failed, update service.Schemas failed, operator: %s",
+ serviceID, remoteIP), err)
return pb.NewError(pb.ErrInternal, err.Error())
}
} else {
if len(nonExistSchemaIds) != 0 {
errInfo := fmt.Errorf("non-existent schemaIDs
%v", nonExistSchemaIds)
- log.Errorf(errInfo, "modify service[%s] schemas
failed, operator: %s", serviceID, remoteIP)
+ log.Error(fmt.Sprintf("modify service %s
schemas failed, operator: %s", serviceID, remoteIP), err)
return pb.NewError(pb.ErrUndefinedSchemaID,
errInfo.Error())
}
for _, needUpdateSchema := range needUpdateSchemas {
@@ -857,20 +865,20 @@ func (ds *DataSource) modifySchemas(ctx context.Context,
service *pb.MicroServic
return pb.NewError(pb.ErrInternal,
err.Error())
}
if !exist {
- err := UpdateSchema(ctx,
GeneratorSchemaFilter(ctx, serviceID, needUpdateSchema.SchemaId),
bson.M{"$set": bson.M{SchemaInfo: needUpdateSchema.Schema, SchemaSummary:
needUpdateSchema.Summary}}, options.FindOneAndUpdate().SetUpsert(true))
+ err := UpdateSchema(ctx,
GeneratorSchemaFilter(ctx, serviceID, needUpdateSchema.SchemaId),
bson.M{"$set": bson.M{ColumnSchemaInfo: needUpdateSchema.Schema,
ColumnSchemaSummary: needUpdateSchema.Summary}},
options.FindOneAndUpdate().SetUpsert(true))
if err != nil {
return
pb.NewError(pb.ErrInternal, err.Error())
}
} else {
- log.Warnf("schema[%s/%s] and it's
summary already exist, skip to update, operator: %s",
- serviceID,
needUpdateSchema.SchemaId, remoteIP)
+ log.Warn(fmt.Sprintf("schema[%s/%s] and
it's summary already exist, skip to update, operator: %s",
+ serviceID,
needUpdateSchema.SchemaId, remoteIP))
}
}
}
for _, schema := range needAddSchemas {
- log.Infof("add new schema[%s/%s], operator: %s",
serviceID, schema.SchemaId, remoteIP)
- err := UpdateSchema(ctx, GeneratorSchemaFilter(ctx,
serviceID, schema.SchemaId), bson.M{"$set": bson.M{SchemaInfo: schema.Schema,
SchemaSummary: schema.Summary}}, options.FindOneAndUpdate().SetUpsert(true))
+ log.Info(fmt.Sprintf("add new schema[%s/%s], operator:
%s", serviceID, schema.SchemaId, remoteIP))
+ err := UpdateSchema(ctx, GeneratorSchemaFilter(ctx,
serviceID, schema.SchemaId), bson.M{"$set": bson.M{ColumnSchemaInfo:
schema.Schema, ColumnSchemaSummary: schema.Summary}},
options.FindOneAndUpdate().SetUpsert(true))
if err != nil {
return pb.NewError(pb.ErrInternal, err.Error())
}
@@ -879,8 +887,8 @@ func (ds *DataSource) modifySchemas(ctx context.Context,
service *pb.MicroServic
var schemaIDs []string
for _, schema := range needAddSchemas {
- log.Infof("add new schema[%s/%s], operator: %s",
serviceID, schema.SchemaId, remoteIP)
- err := UpdateSchema(ctx, GeneratorSchemaFilter(ctx,
serviceID, schema.SchemaId), bson.M{"$set": bson.M{SchemaInfo: schema.Schema,
SchemaSummary: schema.Summary}}, options.FindOneAndUpdate().SetUpsert(true))
+ log.Info(fmt.Sprintf("add new schema[%s/%s], operator:
%s", serviceID, schema.SchemaId, remoteIP))
+ err := UpdateSchema(ctx, GeneratorSchemaFilter(ctx,
serviceID, schema.SchemaId), bson.M{"$set": bson.M{ColumnSchemaInfo:
schema.Schema, ColumnSchemaSummary: schema.Summary}},
options.FindOneAndUpdate().SetUpsert(true))
if err != nil {
return pb.NewError(pb.ErrInternal, err.Error())
}
@@ -888,8 +896,8 @@ func (ds *DataSource) modifySchemas(ctx context.Context,
service *pb.MicroServic
}
for _, schema := range needUpdateSchemas {
- log.Infof("update schema[%s/%s], operator: %s",
serviceID, schema.SchemaId, remoteIP)
- err := UpdateSchema(ctx, GeneratorSchemaFilter(ctx,
serviceID, schema.SchemaId), bson.M{"$set": bson.M{SchemaInfo: schema.Schema,
SchemaSummary: schema.Summary}}, options.FindOneAndUpdate().SetUpsert(true))
+ log.Info(fmt.Sprintf("update schema[%s/%s], operator:
%s", serviceID, schema.SchemaId, remoteIP))
+ err := UpdateSchema(ctx, GeneratorSchemaFilter(ctx,
serviceID, schema.SchemaId), bson.M{"$set": bson.M{ColumnSchemaInfo:
schema.Schema, ColumnSchemaSummary: schema.Summary}},
options.FindOneAndUpdate().SetUpsert(true))
if err != nil {
return pb.NewError(pb.ErrInternal, err.Error())
}
@@ -897,17 +905,17 @@ func (ds *DataSource) modifySchemas(ctx context.Context,
service *pb.MicroServic
}
for _, schema := range needDeleteSchemas {
- log.Infof("delete non-existent schema[%s/%s], operator:
%s", serviceID, schema.SchemaId, remoteIP)
+ log.Info(fmt.Sprintf("delete non-existent
schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
err = DeleteSchema(ctx, GeneratorSchemaFilter(ctx,
serviceID, schema.SchemaId))
if err != nil {
return pb.NewError(pb.ErrInternal, err.Error())
}
}
- err := UpdateService(ctx, GeneratorServiceFilter(ctx,
serviceID), bson.M{"$set": bson.M{ServiceSchemas: schemaIDs}})
+ updateData := bson.M{StringBuilder([]string{ColumnServiceInfo,
ColumnSchemas}): schemaIDs}
+ err := UpdateService(ctx, GeneratorServiceFilter(ctx,
serviceID), bson.M{"$set": updateData})
if err != nil {
- log.Errorf(err, "modify service[%s] schemas failed,
update service.Schemas failed, operator: %s",
- serviceID, remoteIP)
+ log.Error(fmt.Sprintf("modify service %s schemas
failed, update service.Schemas failed, operator: %s", serviceID, remoteIP), err)
return pb.NewError(pb.ErrInternal, err.Error())
}
}
@@ -917,7 +925,7 @@ func (ds *DataSource) modifySchemas(ctx context.Context,
service *pb.MicroServic
func (ds *DataSource) AddRule(ctx context.Context, request
*pb.AddServiceRulesRequest) (*pb.AddServiceRulesResponse, error) {
exist, err := ServiceExistID(ctx, request.ServiceId)
if err != nil {
- log.Errorf(err, "failed to add rules for service [%s] for get
service failed,", request.ServiceId)
+ log.Error(fmt.Sprintf("failed to add rules for service %s for
get service failed", request.ServiceId), err)
return &pb.AddServiceRulesResponse{
Response: pb.CreateResponse(pb.ErrInternal, "Failed to
check service exist"),
}, nil
@@ -1013,7 +1021,7 @@ func (ds *DataSource) DeleteRule(ctx context.Context,
request *pb.DeleteServiceR
*pb.DeleteServiceRulesResponse, error) {
exist, err := ServiceExistID(ctx, request.ServiceId)
if err != nil {
- log.Errorf(err, "failed to add tags for service [%s] for get
service failed,", request.ServiceId)
+ log.Error(fmt.Sprintf("failed to add tags for service %s for
get service failed", request.ServiceId), err)
return &pb.DeleteServiceRulesResponse{
Response: pb.CreateResponse(pb.ErrInternal, "Failed to
check service exist"),
}, err
@@ -1046,7 +1054,6 @@ func (ds *DataSource) UpdateRule(ctx context.Context,
request *pb.UpdateServiceR
if err != nil {
return &pb.UpdateServiceRuleResponse{
Response: pb.CreateResponse(pb.ErrServiceNotExists,
"UpdateRule failed for get service failed."),
- //Schemas: nil,
}, nil
}
if !exist {
@@ -1077,12 +1084,14 @@ func (ds *DataSource) UpdateRule(ctx context.Context,
request *pb.UpdateServiceR
}, nil
}
- newRule := bson.M{"$set": bson.M{RuleRuletype: request.Rule.RuleType,
- RulePattern: request.Rule.Pattern, RuleAttribute:
request.Rule.Attribute,
- RuleDescription: request.Rule.Description,
- RuleModTime: strconv.FormatInt(time.Now().Unix(), 10)}}
+ newRule := bson.M{
+ StringBuilder([]string{ColumnRuleInfo, ColumnRuleType}):
request.Rule.RuleType,
+ StringBuilder([]string{ColumnRuleInfo, ColumnPattern}):
request.Rule.Pattern,
+ StringBuilder([]string{ColumnRuleInfo, ColumnAttribute}):
request.Rule.Attribute,
+ StringBuilder([]string{ColumnRuleInfo, ColumnDescription}):
request.Rule.Description,
+ StringBuilder([]string{ColumnRuleInfo, ColumnModTime}):
strconv.FormatInt(time.Now().Unix(), 10)}
- err = UpdateRule(ctx, GeneratorRuleFilter(ctx, request.ServiceId,
request.RuleId), newRule)
+ err = UpdateRule(ctx, GeneratorRuleFilter(ctx, request.ServiceId,
request.RuleId), bson.M{"$set": newRule})
if err != nil {
return &pb.UpdateServiceRuleResponse{
Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
@@ -1188,7 +1197,7 @@ func getServiceDetailUtil(ctx context.Context, mgs
*Service, countOnly bool, opt
case "rules":
rules, err := GetRules(ctx, mgs.ServiceInfo.ServiceId)
if err != nil {
- log.Errorf(err, "get service[%s]'s all rules
failed", mgs.ServiceInfo.ServiceId)
+ log.Error(fmt.Sprintf("get service %s's all
rules failed", mgs.ServiceInfo.ServiceId), err)
return nil, err
}
for _, rule := range rules {
@@ -1200,7 +1209,7 @@ func getServiceDetailUtil(ctx context.Context, mgs
*Service, countOnly bool, opt
case "schemas":
schemas, err := GetSchemas(ctx,
GeneratorServiceFilter(ctx, mgs.ServiceInfo.ServiceId))
if err != nil {
- log.Errorf(err, "get service[%s]'s all schemas
failed", mgs.ServiceInfo.ServiceId)
+ log.Error(fmt.Sprintf("get service %s's all
schemas failed", mgs.ServiceInfo.ServiceId), err)
return nil, err
}
serviceDetail.SchemaInfos = schemas
@@ -1209,7 +1218,7 @@ func getServiceDetailUtil(ctx context.Context, mgs
*Service, countOnly bool, opt
case "":
continue
default:
- log.Errorf(nil, "request option[%s] is invalid", opt)
+ log.Info(fmt.Sprintf("request option %s is invalid",
opt))
}
}
return serviceDetail, nil
@@ -1222,7 +1231,7 @@ func UpdateService(ctx context.Context, filter
interface{}, m bson.M) error {
func GetRules(ctx context.Context, serviceID string) ([]*pb.ServiceRule,
error) {
domain := util.ParseDomain(ctx)
project := util.ParseProject(ctx)
- filter := bson.M{Domain: domain, Project: project, RuleServiceID:
serviceID}
+ filter := bson.M{ColumnDomain: domain, ColumnProject: project,
ColumnServiceID: serviceID}
ruleRes, err := client.GetMongoClient().Find(ctx, CollectionRule,
filter)
if err != nil {
@@ -1254,7 +1263,7 @@ func DeleteSchema(ctx context.Context, filter
interface{}) error {
return err
}
if !res {
- return errors.New("delete schema failed")
+ return ErrDeleteSchemaFailed
}
return nil
}
@@ -1267,39 +1276,61 @@ func GeneratorServiceFilter(ctx context.Context,
serviceID string) bson.M {
domain := util.ParseDomain(ctx)
project := util.ParseProject(ctx)
- return bson.M{Domain: domain, Project: project, ServiceServiceID:
serviceID}
+ return bson.M{
+ ColumnDomain: domain,
+ ColumnProject: project,
+ StringBuilder([]string{ColumnServiceInfo, ColumnServiceID}):
serviceID}
}
func GeneratorServiceNameFilter(ctx context.Context, service
*pb.MicroServiceKey) bson.M {
domain := util.ParseDomain(ctx)
project := util.ParseProject(ctx)
- return bson.M{Domain: domain, Project: project, ServiceEnv:
service.Environment, ServiceAppID: service.AppId, ServiceServiceName:
service.ServiceName, ServiceVersion: service.Version}
+ return bson.M{
+ ColumnDomain: domain,
+ ColumnProject: project,
+ StringBuilder([]string{ColumnServiceInfo, ColumnEnv}):
service.Environment,
+ StringBuilder([]string{ColumnServiceInfo, ColumnAppID}):
service.AppId,
+ StringBuilder([]string{ColumnServiceInfo, ColumnServiceName}):
service.ServiceName,
+ StringBuilder([]string{ColumnServiceInfo, ColumnVersion}):
service.Version}
}
func GeneratorServiceAliasFilter(ctx context.Context, service
*pb.MicroServiceKey) bson.M {
domain := util.ParseDomain(ctx)
project := util.ParseProject(ctx)
- return bson.M{Domain: domain, Project: project, ServiceEnv:
service.Environment, ServiceAppID: service.AppId, ServiceAlias: service.Alias,
ServiceVersion: service.Version}
+ return bson.M{
+ ColumnDomain: domain,
+ ColumnProject: project,
+ StringBuilder([]string{ColumnServiceInfo, ColumnEnv}):
service.Environment,
+ StringBuilder([]string{ColumnServiceInfo, ColumnAppID}):
service.AppId,
+ StringBuilder([]string{ColumnServiceInfo, ColumnAlias}):
service.Alias,
+ StringBuilder([]string{ColumnServiceInfo, ColumnVersion}):
service.Version}
}
func GeneratorRuleAttFilter(ctx context.Context, serviceID, attribute, pattern
string) bson.M {
- return bson.M{RuleServiceID: serviceID, RuleAttribute: attribute,
RulePattern: pattern}
+ return bson.M{
+ ColumnServiceID: serviceID,
+ StringBuilder([]string{ColumnRuleInfo, ColumnAttribute}):
attribute,
+ StringBuilder([]string{ColumnRuleInfo, ColumnPattern}):
pattern}
}
func GeneratorSchemaFilter(ctx context.Context, serviceID, schemaID string)
bson.M {
domain := util.ParseDomain(ctx)
project := util.ParseProject(ctx)
- return bson.M{Domain: domain, Project: project, SchemaServiceID:
serviceID, SchemaID: schemaID}
+ return bson.M{ColumnDomain: domain, ColumnProject: project,
ColumnServiceID: serviceID, ColumnSchemaID: schemaID}
}
func GeneratorRuleFilter(ctx context.Context, serviceID, ruleID string) bson.M
{
domain := util.ParseDomain(ctx)
project := util.ParseProject(ctx)
- return bson.M{Domain: domain, Project: project, RuleServiceID:
serviceID, RuleRuleID: ruleID}
+ return bson.M{
+ ColumnDomain: domain,
+ ColumnProject: project,
+ ColumnServiceID: serviceID,
+ StringBuilder([]string{ColumnRuleInfo, ColumnRuleID}): ruleID}
}
func GetSchemas(ctx context.Context, filter bson.M) ([]*pb.Schema, error) {
@@ -1346,56 +1377,876 @@ func SchemaExist(ctx context.Context, serviceID,
schemaID string) (bool, error)
// Instance management
func (ds *DataSource) RegisterInstance(ctx context.Context, request
*pb.RegisterInstanceRequest) (*pb.RegisterInstanceResponse, error) {
- return &pb.RegisterInstanceResponse{}, nil
+ remoteIP := util.GetIPFromContext(ctx)
+ instance := request.Instance
+
+ // 允许自定义 id
+ if len(instance.InstanceId) > 0 {
+ resp, err := ds.Heartbeat(ctx, &pb.HeartbeatRequest{
+ InstanceId: instance.InstanceId,
+ ServiceId: instance.ServiceId,
+ })
+ if err != nil || resp == nil {
+ log.Error(fmt.Sprintf("register service %s's instance
failed, endpoints %s, host '%s', operator %s",
+ instance.ServiceId, instance.Endpoints,
instance.HostName, remoteIP), err)
+ return &pb.RegisterInstanceResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, nil
+ }
+ switch resp.Response.GetCode() {
+ case pb.ResponseSuccess:
+ log.Info(fmt.Sprintf("register instance successful,
reuse instance[%s/%s], operator %s",
+ instance.ServiceId, instance.InstanceId,
remoteIP))
+ return &pb.RegisterInstanceResponse{
+ Response: resp.Response,
+ InstanceId: instance.InstanceId,
+ }, nil
+ case pb.ErrInstanceNotExists:
+ // register a new one
+ return registryInstance(ctx, request)
+ default:
+ log.Error(fmt.Sprintf("register instance failed, reuse
instance %s %s, operator %s",
+ instance.ServiceId, instance.InstanceId,
remoteIP), err)
+ return &pb.RegisterInstanceResponse{
+ Response: resp.Response,
+ }, err
+ }
+ }
+
+ if err := preProcessRegisterInstance(ctx, instance); err != nil {
+ log.Error(fmt.Sprintf("register service %s instance failed,
endpoints %s, host %s operator %s",
+ instance.ServiceId, instance.Endpoints,
instance.HostName, remoteIP), err)
+ return &pb.RegisterInstanceResponse{
+ Response: pb.CreateResponseWithSCErr(err),
+ }, nil
+ }
+ return registryInstance(ctx, request)
}
// GetInstances returns instances under the current domain
func (ds *DataSource) GetInstance(ctx context.Context, request
*pb.GetOneInstanceRequest) (*pb.GetOneInstanceResponse, error) {
- return &pb.GetOneInstanceResponse{}, nil
+ service := &Service{}
+ var err error
+ if len(request.ConsumerServiceId) > 0 {
+ filter := GeneratorServiceFilter(ctx, request.ConsumerServiceId)
+ service, err = GetService(ctx, filter)
+ if err != nil {
+ log.Error(fmt.Sprintf(" get consumer failed, consumer
%s find provider instance %s",
+ request.ConsumerServiceId,
request.ProviderInstanceId), err)
+ return &pb.GetOneInstanceResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ if service == nil {
+ log.Error(fmt.Sprintf("consumer does not exist,
consumer %s find provider instance %s %s",
+ request.ConsumerServiceId,
request.ProviderServiceId, request.ProviderInstanceId), err)
+ return &pb.GetOneInstanceResponse{
+ Response:
pb.CreateResponse(pb.ErrServiceNotExists,
+ fmt.Sprintf("Consumer[%s] does not
exist.", request.ConsumerServiceId)),
+ }, nil
+ }
+ }
+
+ filter := GeneratorServiceFilter(ctx, request.ProviderServiceId)
+ provider, err := GetService(ctx, filter)
+ if err != nil {
+ log.Error(fmt.Sprintf("get provider failed, consumer %s find
provider instance %s %s",
+ request.ConsumerServiceId, request.ProviderServiceId,
request.ProviderInstanceId), err)
+ return &pb.GetOneInstanceResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ if provider == nil {
+ log.Error(fmt.Sprintf("provider does not exist, consumer %s
find provider instance %s %s",
+ request.ConsumerServiceId, request.ProviderServiceId,
request.ProviderInstanceId), err)
+ return &pb.GetOneInstanceResponse{
+ Response: pb.CreateResponse(pb.ErrServiceNotExists,
+ fmt.Sprintf("Provider[%s] does not exist.",
request.ProviderServiceId)),
+ }, nil
+ }
+
+ findFlag := func() string {
+ return fmt.Sprintf("Consumer[%s][%s/%s/%s/%s] find
provider[%s][%s/%s/%s/%s] instance[%s]",
+ request.ConsumerServiceId,
service.ServiceInfo.Environment, service.ServiceInfo.AppId,
service.ServiceInfo.ServiceName, service.ServiceInfo.Version,
+ provider.ServiceInfo.ServiceId,
provider.ServiceInfo.Environment, provider.ServiceInfo.AppId,
provider.ServiceInfo.ServiceName, provider.ServiceInfo.Version,
+ request.ProviderInstanceId)
+ }
+
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+ filter = bson.M{
+ ColumnDomain: domain,
+ ColumnProject: project,
+ StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}):
request.ProviderServiceId}
+ findOneRes, err := client.GetMongoClient().FindOne(ctx,
CollectionInstance, filter)
+ if err != nil {
+ mes := fmt.Errorf("%s failed, provider instance does not
exist", findFlag())
+ log.Error("FindInstances.GetWithProviderID failed", err)
+ return &pb.GetOneInstanceResponse{
+ Response: pb.CreateResponse(pb.ErrInstanceNotExists,
mes.Error()),
+ }, nil
+ }
+ var instance Instance
+ err = findOneRes.Decode(&instance)
+ if err != nil {
+ log.Error(fmt.Sprintf("FindInstances.GetWithProviderID failed
%s failed", findFlag()), err)
+ return &pb.GetOneInstanceResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+
+ return &pb.GetOneInstanceResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess, "Get instance
successfully."),
+ Instance: instance.InstanceInfo,
+ }, nil
}
func (ds *DataSource) GetInstances(ctx context.Context, request
*pb.GetInstancesRequest) (*pb.GetInstancesResponse, error) {
- return &pb.GetInstancesResponse{}, nil
+ service := &Service{}
+ var err error
+
+ if len(request.ConsumerServiceId) > 0 {
+ filter := GeneratorServiceFilter(ctx, request.ConsumerServiceId)
+ service, err = GetService(ctx, filter)
+ if err != nil {
+ log.Error(fmt.Sprintf("get consumer failed, consumer %s
find provider %sinstances",
+ request.ConsumerServiceId,
request.ProviderServiceId), err)
+ return &pb.GetInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ if service == nil {
+ log.Error(fmt.Sprintf("consumer does not exist,
consumer %s find provider %s instances",
+ request.ConsumerServiceId,
request.ProviderServiceId), err)
+ return &pb.GetInstancesResponse{
+ Response:
pb.CreateResponse(pb.ErrServiceNotExists,
+ fmt.Sprintf("Consumer[%s] does not
exist.", request.ConsumerServiceId)),
+ }, nil
+ }
+ }
+
+ filter := GeneratorServiceFilter(ctx, request.ProviderServiceId)
+ provider, err := GetService(ctx, filter)
+ if err != nil {
+ log.Error(fmt.Sprintf("get provider failed, consumer %s find
provider instances %s",
+ request.ConsumerServiceId, request.ProviderServiceId),
err)
+ return &pb.GetInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ if provider == nil {
+ log.Error(fmt.Sprintf("provider does not exist, consumer %s
find provider %s instances",
+ request.ConsumerServiceId, request.ProviderServiceId),
err)
+ return &pb.GetInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrServiceNotExists,
+ fmt.Sprintf("Provider[%s] does not exist.",
request.ProviderServiceId)),
+ }, nil
+ }
+
+ findFlag := fmt.Sprintf("Consumer[%s][%s/%s/%s/%s] find
provider[%s][%s/%s/%s/%s] instances",
+ request.ConsumerServiceId, service.ServiceInfo.Environment,
service.ServiceInfo.AppId, service.ServiceInfo.ServiceName,
service.ServiceInfo.Version,
+ provider.ServiceInfo.ServiceId,
provider.ServiceInfo.Environment, provider.ServiceInfo.AppId,
provider.ServiceInfo.ServiceName, provider.ServiceInfo.Version)
+
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+ filter = bson.M{
+ ColumnDomain: domain,
+ ColumnProject: project,
+ StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}):
request.ProviderServiceId}
+ resp, err := client.GetMongoClient().Find(ctx, CollectionInstance,
filter)
+ if err != nil {
+ log.Error(fmt.Sprintf("FindInstancesCache.Get failed %s
failed", findFlag), err)
+ return &pb.GetInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ if resp == nil {
+ mes := fmt.Errorf("%s failed, provider does not exist",
findFlag)
+ log.Error("FindInstancesCache.Get failed", mes)
+ return &pb.GetInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrServiceNotExists,
mes.Error()),
+ }, nil
+ }
+
+ var instances []*pb.MicroServiceInstance
+ for resp.Next(ctx) {
+ var instance Instance
+ err := resp.Decode(&instance)
+ if err != nil {
+ log.Error(fmt.Sprintf("FindInstances.GetWithProviderID
failed %s failed", findFlag), err)
+ return &pb.GetInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ instances = append(instances, instance.InstanceInfo)
+ }
+
+ return &pb.GetInstancesResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess, "Query service
instances successfully."),
+ Instances: instances,
+ }, nil
}
// GetProviderInstances returns instances under the specified domain
func (ds *DataSource) GetProviderInstances(ctx context.Context, request
*pb.GetProviderInstancesRequest) (instances []*pb.MicroServiceInstance, rev
string, err error) {
- return nil, "", nil
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+ filter := bson.M{
+ ColumnDomain: domain,
+ ColumnProject: project,
+ StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}):
request.ProviderServiceId}
+
+ findRes, err := client.GetMongoClient().Find(ctx, CollectionInstance,
filter)
+ if err != nil {
+ return
+ }
+
+ for findRes.Next(ctx) {
+ var mongoInstance Instance
+ err := findRes.Decode(&mongoInstance)
+ if err == nil {
+ instances = append(instances,
mongoInstance.InstanceInfo)
+ }
+ }
+
+ return instances, "", nil
}
func (ds *DataSource) GetAllInstances(ctx context.Context, request
*pb.GetAllInstancesRequest) (*pb.GetAllInstancesResponse, error) {
- return &pb.GetAllInstancesResponse{}, nil
+
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+
+ filter := bson.M{ColumnDomain: domain, ColumnProject: project}
+
+ findRes, err := client.GetMongoClient().Find(ctx, CollectionInstance,
filter)
+ if err != nil {
+ return nil, err
+ }
+ resp := &pb.GetAllInstancesResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess, "Get all
instances successfully"),
+ }
+
+ for findRes.Next(ctx) {
+ var instance Instance
+ err := findRes.Decode(&instance)
+ if err != nil {
+ return &pb.GetAllInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ resp.Instances = append(resp.Instances, instance.InstanceInfo)
+ }
+
+ return resp, nil
}
func (ds *DataSource) BatchGetProviderInstances(ctx context.Context, request
*pb.BatchGetInstancesRequest) (instances []*pb.MicroServiceInstance, rev
string, err error) {
- return nil, "", nil
+ if request == nil || len(request.ServiceIds) == 0 {
+ return nil, "", ErrInvalidParamBatchGetInstancesRequest
+ }
+
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+
+ for _, providerServiceID := range request.ServiceIds {
+ filter := bson.M{
+ ColumnDomain: domain,
+ ColumnProject: project,
+ StringBuilder([]string{ColumnInstanceInfo,
ColumnServiceID}): providerServiceID}
+ findRes, err := client.GetMongoClient().Find(ctx,
CollectionInstance, filter)
+ if err != nil {
+ return instances, "", nil
+ }
+
+ for findRes.Next(ctx) {
+ var mongoInstance Instance
+ err := findRes.Decode(&mongoInstance)
+ if err == nil {
+ instances = append(instances,
mongoInstance.InstanceInfo)
+ }
+ }
+ }
+
+ return instances, "", nil
}
// FindInstances returns instances under the specified domain
func (ds *DataSource) FindInstances(ctx context.Context, request
*pb.FindInstancesRequest) (*pb.FindInstancesResponse, error) {
- return &pb.FindInstancesResponse{}, nil
+ provider := &pb.MicroServiceKey{
+ Tenant: util.ParseTargetDomainProject(ctx),
+ Environment: request.Environment,
+ AppId: request.AppId,
+ ServiceName: request.ServiceName,
+ Alias: request.ServiceName,
+ Version: request.VersionRule,
+ }
+
+ return ds.findInstance(ctx, request, provider)
}
func (ds *DataSource) UpdateInstanceStatus(ctx context.Context, request
*pb.UpdateInstanceStatusRequest) (*pb.UpdateInstanceStatusResponse, error) {
- return &pb.UpdateInstanceStatusResponse{}, nil
+ updateStatusFlag := util.StringJoin([]string{request.ServiceId,
request.InstanceId, request.Status}, "/")
+
+ // todo finish get instance
+ instance, err := GetInstance(ctx, request.ServiceId, request.InstanceId)
+ if err != nil {
+ log.Error(fmt.Sprintf("update instance %s status failed",
updateStatusFlag), err)
+ return &pb.UpdateInstanceStatusResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ if instance == nil {
+ log.Error(fmt.Sprintf("update instance %s status failed,
instance does not exist", updateStatusFlag), err)
+ return &pb.UpdateInstanceStatusResponse{
+ Response: pb.CreateResponse(pb.ErrInstanceNotExists,
"Service instance does not exist."),
+ }, nil
+ }
+
+ copyInstanceRef := *instance
+ copyInstanceRef.InstanceInfo.Status = request.Status
+
+ if err := UpdateInstanceS(ctx, copyInstanceRef.InstanceInfo); err !=
nil {
+ log.Error(fmt.Sprintf("update instance %s status failed",
updateStatusFlag), err)
+ resp := &pb.UpdateInstanceStatusResponse{
+ Response: pb.CreateResponseWithSCErr(err),
+ }
+ if err.InternalError() {
+ return resp, err
+ }
+ return resp, nil
+ }
+
+ log.Infof("update instance[%s] status successfully", updateStatusFlag)
+ return &pb.UpdateInstanceStatusResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess, "Update service
instance status successfully."),
+ }, nil
}
func (ds *DataSource) UpdateInstanceProperties(ctx context.Context, request
*pb.UpdateInstancePropsRequest) (*pb.UpdateInstancePropsResponse, error) {
- return &pb.UpdateInstancePropsResponse{}, nil
+ instanceFlag := util.StringJoin([]string{request.ServiceId,
request.InstanceId}, "/")
+
+ instance, err := GetInstance(ctx, request.ServiceId, request.InstanceId)
+ if err != nil {
+ log.Error(fmt.Sprintf("update instance %s properties failed",
instanceFlag), err)
+ return &pb.UpdateInstancePropsResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ if instance == nil {
+ log.Error(fmt.Sprintf("update instance %s properties failed,
instance does not exist", instanceFlag), err)
+ return &pb.UpdateInstancePropsResponse{
+ Response: pb.CreateResponse(pb.ErrInstanceNotExists,
"Service instance does not exist."),
+ }, nil
+ }
+
+ copyInstanceRef := *instance
+ copyInstanceRef.InstanceInfo.Properties = request.Properties
+
+ // todo finish update instance
+ if err := UpdateInstanceP(ctx, copyInstanceRef.InstanceInfo); err !=
nil {
+ log.Error(fmt.Sprintf("update instance %s properties failed",
instanceFlag), err)
+ resp := &pb.UpdateInstancePropsResponse{
+ Response: pb.CreateResponseWithSCErr(err),
+ }
+ if err.InternalError() {
+ return resp, err
+ }
+ return resp, nil
+ }
+
+ log.Infof("update instance[%s] properties successfully", instanceFlag)
+ return &pb.UpdateInstancePropsResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess, "Update service
instance properties successfully."),
+ }, nil
}
func (ds *DataSource) UnregisterInstance(ctx context.Context, request
*pb.UnregisterInstanceRequest) (*pb.UnregisterInstanceResponse, error) {
- return &pb.UnregisterInstanceResponse{}, nil
+ remoteIP := util.GetIPFromContext(ctx)
+ serviceID := request.ServiceId
+ instanceID := request.InstanceId
+
+ instanceFlag := util.StringJoin([]string{serviceID, instanceID}, "/")
+
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+ filter := bson.M{
+ ColumnDomain: domain,
+ ColumnProject: project,
+ StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}):
serviceID,
+ StringBuilder([]string{ColumnInstanceInfo, ColumnInstanceID}):
instanceID}
+ _, err := client.GetMongoClient().Delete(ctx, CollectionInstance,
filter)
+ if err != nil {
+ log.Error(fmt.Sprintf("unregister instance failed, instance %s,
operator %s revoke instance failed", instanceFlag, remoteIP), err)
+ return &pb.UnregisterInstanceResponse{
+ Response: pb.CreateResponse(pb.ErrInternal, "delete
instance failed"),
+ }, err
+ }
+
+ log.Infof("unregister instance[%s], operator %s", instanceFlag,
remoteIP)
+ return &pb.UnregisterInstanceResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess, "Unregister
service instance successfully."),
+ }, nil
}
func (ds *DataSource) Heartbeat(ctx context.Context, request
*pb.HeartbeatRequest) (*pb.HeartbeatResponse, error) {
- return heartbeat.Instance().Heartbeat(ctx, request)
+ remoteIP := util.GetIPFromContext(ctx)
+ instanceFlag := util.StringJoin([]string{request.ServiceId,
request.InstanceId}, "/")
+ err := KeepAliveLease(ctx, request)
+ if err != nil {
+ log.Error(fmt.Sprintf("heartbeat failed, instance %s operator
%s", instanceFlag, remoteIP), err)
+ resp := &pb.HeartbeatResponse{
+ Response: pb.CreateResponseWithSCErr(err),
+ }
+ if err.InternalError() {
+ return resp, err
+ }
+ return resp, nil
+ }
+ return &pb.HeartbeatResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess,
+ "Update service instance heartbeat successfully."),
+ }, nil
}
func (ds *DataSource) HeartbeatSet(ctx context.Context, request
*pb.HeartbeatSetRequest) (*pb.HeartbeatSetResponse, error) {
- return &pb.HeartbeatSetResponse{}, nil
+ domainProject := util.ParseDomainProject(ctx)
+
+ heartBeatCount := len(request.Instances)
+ existFlag := make(map[string]bool, heartBeatCount)
+ instancesHbRst := make(chan *pb.InstanceHbRst, heartBeatCount)
+ noMultiCounter := 0
+
+ for _, heartbeatElement := range request.Instances {
+ if _, ok :=
existFlag[heartbeatElement.ServiceId+heartbeatElement.InstanceId]; ok {
+ log.Warnf("instance[%s/%s] is duplicate request
heartbeat set",
+ heartbeatElement.ServiceId,
heartbeatElement.InstanceId)
+ continue
+ } else {
+
existFlag[heartbeatElement.ServiceId+heartbeatElement.InstanceId] = true
+ noMultiCounter++
+ }
+ gopool.Go(getHeartbeatFunc(ctx, domainProject, instancesHbRst,
heartbeatElement))
+ }
+
+ count := 0
+ successFlag := false
+ failFlag := false
+ instanceHbRstArr := make([]*pb.InstanceHbRst, 0, heartBeatCount)
+
+ for hbRst := range instancesHbRst {
+ count++
+ if len(hbRst.ErrMessage) != 0 {
+ failFlag = true
+ } else {
+ successFlag = true
+ }
+ instanceHbRstArr = append(instanceHbRstArr, hbRst)
+ if count == noMultiCounter {
+ close(instancesHbRst)
+ }
+ }
+
+ if !failFlag && successFlag {
+ log.Infof("batch update heartbeats[%d] successfully", count)
+ return &pb.HeartbeatSetResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess,
"Heartbeat set successfully."),
+ Instances: instanceHbRstArr,
+ }, nil
+ }
+
+ log.Info(fmt.Sprintf("batch update heartbeats failed %v",
request.Instances))
+ return &pb.HeartbeatSetResponse{
+ Response: pb.CreateResponse(pb.ErrInstanceNotExists,
"Heartbeat set failed."),
+ Instances: instanceHbRstArr,
+ }, nil
}
func (ds *DataSource) BatchFind(ctx context.Context, request
*pb.BatchFindInstancesRequest) (*pb.BatchFindInstancesResponse, error) {
- return &pb.BatchFindInstancesResponse{}, nil
+ response := &pb.BatchFindInstancesResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess, "Batch query
service instances successfully."),
+ }
+
+ var err error
+
+ response.Services, err = ds.batchFindServices(ctx, request)
+ if err != nil {
+ return &pb.BatchFindInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+
+ response.Instances, err = ds.batchFindInstances(ctx, request)
+ if err != nil {
+ return &pb.BatchFindInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+
+ return response, nil
+}
+
+func registryInstance(ctx context.Context, request
*pb.RegisterInstanceRequest) (*pb.RegisterInstanceResponse, error) {
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+ remoteIP := util.GetIPFromContext(ctx)
+ instance := request.Instance
+ instanceID := instance.InstanceId
+ data := &Instance{
+ Domain: domain,
+ Project: project,
+ RefreshTime: time.Now(),
+ InstanceInfo: instance,
+ }
+
+ instanceFlag := fmt.Sprintf("endpoints %v, host '%s', serviceID %s",
+ instance.Endpoints, instance.HostName, instance.ServiceId)
+
+ insertRes, err := client.GetMongoClient().Insert(ctx,
CollectionInstance, data)
+ if err != nil {
+ log.Error(fmt.Sprintf("register instance failed %s instanceID
%s operator %s", instanceFlag, instanceID, remoteIP), err)
+ return &pb.RegisterInstanceResponse{
+ Response: pb.CreateResponse(pb.ErrUnavailableBackend,
err.Error()),
+ }, err
+ }
+
+ log.Infof("register instance %s, instanceID %s, operator %s",
+ instanceFlag, insertRes.InsertedID, remoteIP)
+ return &pb.RegisterInstanceResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess, "Register
service instance successfully."),
+ InstanceId: instanceID,
+ }, nil
+}
+
+func (ds *DataSource) findInstance(ctx context.Context, request
*pb.FindInstancesRequest, provider *pb.MicroServiceKey)
(*pb.FindInstancesResponse, error) {
+ var err error
+ domainProject := util.ParseDomainProject(ctx)
+ service := &Service{ServiceInfo: &pb.MicroService{Environment:
request.Environment}}
+ if len(request.ConsumerServiceId) > 0 {
+ filter := GeneratorServiceFilter(ctx, request.ConsumerServiceId)
+ service, err = GetService(ctx, filter)
+ if err != nil {
+ log.Error(fmt.Sprintf("get consumer failed, consumer %s
find provider %s/%s/%s/%s",
+ request.ConsumerServiceId, request.Environment,
request.AppId, request.ServiceName, request.VersionRule), err)
+ return &pb.FindInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ if service == nil {
+ log.Error(fmt.Sprintf("consumer does not exist,
consumer %s find provider %s/%s/%s/%s",
+ request.ConsumerServiceId, request.Environment,
request.AppId, request.ServiceName, request.VersionRule), err)
+ return &pb.FindInstancesResponse{
+ Response:
pb.CreateResponse(pb.ErrServiceNotExists,
+ fmt.Sprintf("Consumer[%s] does not
exist.", request.ConsumerServiceId)),
+ }, nil
+ }
+ provider.Environment = service.ServiceInfo.Environment
+ }
+
+ // provider is not a shared micro-service,
+ // only allow shared micro-service instances found request different
domains.
+ ctx = util.SetTargetDomainProject(ctx, util.ParseDomain(ctx),
util.ParseProject(ctx))
+ provider.Tenant = util.ParseTargetDomainProject(ctx)
+
+ findFlag := fmt.Sprintf("Consumer[%s][%s/%s/%s/%s] find
provider[%s/%s/%s/%s]",
+ request.ConsumerServiceId, service.ServiceInfo.Environment,
service.ServiceInfo.AppId, service.ServiceInfo.ServiceName,
service.ServiceInfo.Version,
+ provider.Environment, provider.AppId, provider.ServiceName,
provider.Version)
+
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+ resp, err := client.GetMongoClient().Find(ctx, CollectionInstance,
bson.M{ColumnDomain: domain, ColumnProject: project})
+ if err != nil {
+ log.Error(fmt.Sprintf("FindInstancesCache.Get failed %s
failed", findFlag), err)
+ return &pb.FindInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ if resp == nil {
+ mes := fmt.Errorf("%s failed, provider does not exist",
findFlag)
+ log.Error("FindInstancesCache.Get failed", mes)
+ return &pb.FindInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrServiceNotExists,
mes.Error()),
+ }, nil
+ }
+
+ var instances []*pb.MicroServiceInstance
+ for resp.Next(ctx) {
+ var instance Instance
+ err := resp.Decode(&instance)
+ if err != nil {
+ log.Error(fmt.Sprintf("FindInstances.GetWithProviderID
failed %s failed", findFlag), err)
+ return &pb.FindInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ instances = append(instances, instance.InstanceInfo)
+ }
+
+ // add dependency queue
+ if len(request.ConsumerServiceId) > 0 &&
+ len(instances) > 0 {
+ provider, err = ds.reshapeProviderKey(ctx, provider,
instances[0].ServiceId)
+ if err != nil {
+ return nil, err
+ }
+ if provider != nil {
+ err = AddServiceVersionRule(ctx, domainProject,
service.ServiceInfo, provider)
+ } else {
+ mes := fmt.Errorf("%s failed, provider does not exist",
findFlag)
+ log.Error("AddServiceVersionRule failed", mes)
+ return &pb.FindInstancesResponse{
+ Response:
pb.CreateResponse(pb.ErrServiceNotExists, mes.Error()),
+ }, nil
+ }
+ if err != nil {
+ log.Error(fmt.Sprintf("AddServiceVersionRule failed %s
failed", findFlag), err)
+ return &pb.FindInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ }
+
+ return &pb.FindInstancesResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess, "Query service
instances successfully."),
+ Instances: instances,
+ }, nil
+}
+
+func (ds *DataSource) reshapeProviderKey(ctx context.Context, provider
*pb.MicroServiceKey, providerID string) (
+ *pb.MicroServiceKey, error) {
+ //维护version的规则,service name 可能是别名,所以重新获取
+ filter := GeneratorServiceFilter(ctx, providerID)
+ providerService, err := GetService(ctx, filter)
+ if providerService == nil {
+ return nil, err
+ }
+
+ versionRule := provider.Version
+ provider = pb.MicroServiceToKey(provider.Tenant,
providerService.ServiceInfo)
+ provider.Version = versionRule
+ return provider, nil
+}
+
+func AddServiceVersionRule(ctx context.Context, domainProject string, consumer
*pb.MicroService, provider *pb.MicroServiceKey) error {
+ return nil
+}
+
+func GetInstance(ctx context.Context, serviceID string, instanceID string)
(*Instance, error) {
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+ filter := bson.M{
+ ColumnDomain: domain,
+ ColumnProject: project,
+ StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}):
serviceID,
+ StringBuilder([]string{ColumnInstanceInfo, ColumnInstanceID}):
instanceID}
+ findRes, err := client.GetMongoClient().FindOne(ctx,
CollectionInstance, filter)
+ if err != nil {
+ return nil, err
+ }
+ var instance *Instance
+ if findRes.Err() != nil {
+ //not get any service,not db err
+ return nil, nil
+ }
+ err = findRes.Decode(&instance)
+ if err != nil {
+ return nil, err
+ }
+ return instance, nil
+}
+
+func UpdateInstanceS(ctx context.Context, instance *pb.MicroServiceInstance)
*pb.Error {
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+ filter := bson.M{
+ ColumnDomain: domain,
+ ColumnProject: project,
+ StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}):
instance.ServiceId,
+ StringBuilder([]string{ColumnInstanceInfo, ColumnInstanceID}):
instance.InstanceId}
+ _, err := client.GetMongoClient().Update(ctx, CollectionInstance,
filter, bson.M{"$set": bson.M{"instance.motTimestamp":
strconv.FormatInt(time.Now().Unix(), 10), "instance.status": instance.Status}})
+ if err != nil {
+ return pb.NewError(pb.ErrUnavailableBackend, err.Error())
+ }
+ return nil
+}
+
+func UpdateInstanceP(ctx context.Context, instance *pb.MicroServiceInstance)
*pb.Error {
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+ filter := bson.M{
+ ColumnDomain: domain,
+ ColumnProject: project,
+ StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}):
instance.ServiceId,
+ StringBuilder([]string{ColumnInstanceInfo, ColumnInstanceID}):
instance.InstanceId}
+ _, err := client.GetMongoClient().Update(ctx, CollectionInstance,
filter, bson.M{"$set": bson.M{"instance.motTimestamp":
strconv.FormatInt(time.Now().Unix(), 10), "instance.properties":
instance.Properties}})
+ if err != nil {
+ return pb.NewError(pb.ErrUnavailableBackend, err.Error())
+ }
+ return nil
+}
+
+func KeepAliveLease(ctx context.Context, request *pb.HeartbeatRequest)
*pb.Error {
+ _, err := heartbeat.Instance().Heartbeat(ctx, request)
+ if err != nil {
+ return pb.NewError(pb.ErrInstanceNotExists, err.Error())
+ }
+ return nil
+}
+
+func getHeartbeatFunc(ctx context.Context, domainProject string,
instancesHbRst chan<- *pb.InstanceHbRst, element *pb.HeartbeatSetElement)
func(context.Context) {
+ return func(_ context.Context) {
+ hbRst := &pb.InstanceHbRst{
+ ServiceId: element.ServiceId,
+ InstanceId: element.InstanceId,
+ ErrMessage: "",
+ }
+
+ req := &pb.HeartbeatRequest{
+ InstanceId: element.InstanceId,
+ ServiceId: element.ServiceId,
+ }
+
+ err := KeepAliveLease(ctx, req)
+ if err != nil {
+ hbRst.ErrMessage = err.Error()
+ log.Error(fmt.Sprintf("heartbeat set failed %s %s",
element.ServiceId, element.InstanceId), err)
+ }
+ instancesHbRst <- hbRst
+ }
+}
+
+func (ds *DataSource) batchFindServices(ctx context.Context, request
*pb.BatchFindInstancesRequest) (
+ *pb.BatchFindResult, error) {
+ if len(request.Services) == 0 {
+ return nil, nil
+ }
+ cloneCtx := util.CloneContext(ctx)
+
+ services := &pb.BatchFindResult{}
+ failedResult := make(map[int32]*pb.FindFailedResult)
+ for index, key := range request.Services {
+ findCtx := util.SetContext(cloneCtx, util.CtxRequestRevision,
key.Rev)
+ resp, err := ds.FindInstances(findCtx, &pb.FindInstancesRequest{
+ ConsumerServiceId: request.ConsumerServiceId,
+ AppId: key.Service.AppId,
+ ServiceName: key.Service.ServiceName,
+ VersionRule: key.Service.Version,
+ Environment: key.Service.Environment,
+ })
+ if err != nil {
+ return nil, err
+ }
+ failed, ok := failedResult[resp.Response.GetCode()]
+ AppendFindResponse(findCtx, int64(index), resp.Response,
resp.Instances,
+ &services.Updated, &services.NotModified, &failed)
+ if !ok && failed != nil {
+ failedResult[resp.Response.GetCode()] = failed
+ }
+ }
+ for _, result := range failedResult {
+ services.Failed = append(services.Failed, result)
+ }
+ return services, nil
+}
+
+func (ds *DataSource) batchFindInstances(ctx context.Context, request
*pb.BatchFindInstancesRequest) (*pb.BatchFindResult, error) {
+ if len(request.Instances) == 0 {
+ return nil, nil
+ }
+ cloneCtx := util.CloneContext(ctx)
+ // can not find the shared provider instances
+ cloneCtx = util.SetTargetDomainProject(cloneCtx, util.ParseDomain(ctx),
util.ParseProject(ctx))
+
+ instances := &pb.BatchFindResult{}
+ failedResult := make(map[int32]*pb.FindFailedResult)
+ for index, key := range request.Instances {
+ getCtx := util.SetContext(cloneCtx, util.CtxRequestRevision,
key.Rev)
+ resp, err := ds.GetInstance(getCtx, &pb.GetOneInstanceRequest{
+ ConsumerServiceId: request.ConsumerServiceId,
+ ProviderServiceId: key.Instance.ServiceId,
+ ProviderInstanceId: key.Instance.InstanceId,
+ })
+ if err != nil {
+ return nil, err
+ }
+ failed, ok := failedResult[resp.Response.GetCode()]
+ AppendFindResponse(getCtx, int64(index), resp.Response,
[]*pb.MicroServiceInstance{resp.Instance},
+ &instances.Updated, &instances.NotModified, &failed)
+ if !ok && failed != nil {
+ failedResult[resp.Response.GetCode()] = failed
+ }
+ }
+ for _, result := range failedResult {
+ instances.Failed = append(instances.Failed, result)
+ }
+ return instances, nil
+}
+
+func AppendFindResponse(ctx context.Context, index int64, resp *pb.Response,
instances []*pb.MicroServiceInstance,
+ updatedResult *[]*pb.FindResult, notModifiedResult *[]int64,
failedResult **pb.FindFailedResult) {
+ if code := resp.GetCode(); code != pb.ResponseSuccess {
+ if *failedResult == nil {
+ *failedResult = &pb.FindFailedResult{
+ Error: pb.NewError(code, resp.GetMessage()),
+ }
+ }
+ (*failedResult).Indexes = append((*failedResult).Indexes, index)
+ return
+ }
+ iv, _ := ctx.Value(util.CtxRequestRevision).(string)
+ ov, _ := ctx.Value(util.CtxResponseRevision).(string)
+ if len(iv) > 0 && iv == ov {
+ *notModifiedResult = append(*notModifiedResult, index)
+ return
+ }
+ *updatedResult = append(*updatedResult, &pb.FindResult{
+ Index: index,
+ Instances: instances,
+ Rev: ov,
+ })
+}
+
+func preProcessRegisterInstance(ctx context.Context, instance
*pb.MicroServiceInstance) *pb.Error {
+ if len(instance.Status) == 0 {
+ instance.Status = pb.MSI_UP
+ }
+
+ if len(instance.InstanceId) == 0 {
+ instance.InstanceId = uuid.Generator().GetInstanceID(ctx)
+ }
+
+ instance.Timestamp = strconv.FormatInt(time.Now().Unix(), 10)
+ instance.ModTimestamp = instance.Timestamp
+
+ // 这里应该根据租约计时
+ renewalInterval := apt.RegistryDefaultLeaseRenewalinterval
+ retryTimes := apt.RegistryDefaultLeaseRetrytimes
+ if instance.HealthCheck == nil {
+ instance.HealthCheck = &pb.HealthCheck{
+ Mode: pb.CHECK_BY_HEARTBEAT,
+ Interval: renewalInterval,
+ Times: retryTimes,
+ }
+ } else {
+ // Health
check对象仅用于呈现服务健康检查逻辑,如果CHECK_BY_PLATFORM类型,表明由sidecar代发心跳,实例120s超时
+ switch instance.HealthCheck.Mode {
+ case pb.CHECK_BY_HEARTBEAT:
+ d := instance.HealthCheck.Interval *
(instance.HealthCheck.Times + 1)
+ if d <= 0 {
+ return pb.NewError(pb.ErrInvalidParams,
"Invalid 'healthCheck' settings in request body.")
+ }
+ case pb.CHECK_BY_PLATFORM:
+ // 默认120s
+ instance.HealthCheck.Interval = renewalInterval
+ instance.HealthCheck.Times = retryTimes
+ }
+ }
+
+ filter := GeneratorServiceFilter(ctx, instance.ServiceId)
+ microservice, err := GetService(ctx, filter)
+ if microservice == nil || err != nil {
+ return pb.NewError(pb.ErrServiceNotExists, "Invalid 'serviceID'
in request body.")
+ }
+ instance.Version = microservice.ServiceInfo.Version
+ return nil
}
diff --git a/datasource/mongo/ms_test.go b/datasource/mongo/ms_test.go
index 8a2fb9c..5e4254f 100644
--- a/datasource/mongo/ms_test.go
+++ b/datasource/mongo/ms_test.go
@@ -24,11 +24,13 @@ import (
"time"
"github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/mongo"
"github.com/apache/servicecomb-service-center/datasource/mongo/client"
"github.com/apache/servicecomb-service-center/server/plugin/quota"
pb "github.com/go-chassis/cari/discovery"
"github.com/go-chassis/go-chassis/v2/storage"
"github.com/stretchr/testify/assert"
+ "go.mongodb.org/mongo-driver/bson"
)
func init() {
@@ -185,34 +187,6 @@ func TestGetService(t *testing.T) {
}
-// need mongodb cluster
-//func TestServiceDelete(t *testing.T) {
-// t.Run("delete service by mongo, should pass", func(t *testing.T) {
-// request := &pb.CreateServiceRequest{
-// Service: &pb.MicroService{
-// ServiceId: "ms-service-delete-new-id",
-// ServiceName: "ms-service-delete",
-// AppId: "default",
-// Version: "1.0.4",
-// Level: "BACK",
-// Properties: make(map[string]string),
-// },
-// }
-//
-// resp, err :=
datasource.Instance().RegisterService(getContext(), request)
-// assert.NoError(t, err)
-// assert.Equal(t, resp.Response.GetCode(), pb.ResponseSuccess)
-//
-// res, err :=
datasource.Instance().UnregisterService(getContext(), &pb.DeleteServiceRequest{
-// ServiceId: "ms-service-delete-new-id",
-// Force: false,
-// })
-// fmt.Println(res.Response.Message)
-// assert.NoError(t, err)
-// assert.Equal(t, pb.ResponseSuccess, res.Response.GetCode())
-// })
-//}
-
func TestUpdateService(t *testing.T) {
t.Run("update service by mongo, should pass", func(t *testing.T) {
request := &pb.CreateServiceRequest{
@@ -247,7 +221,7 @@ func TestUpdateService(t *testing.T) {
func TestTagsAdd(t *testing.T) {
// create service
- t.Run("create service", func(t *testing.T) {
+ t.Run("create service, the request is valid, should pass", func(t
*testing.T) {
svc1 := &pb.MicroService{
ServiceId: "service_tag_id",
AppId: "create_tag_group_ms",
@@ -264,7 +238,7 @@ func TestTagsAdd(t *testing.T) {
})
//
- t.Run("the request is valid", func(t *testing.T) {
+ t.Run("create service, the request is valid, should pass", func(t
*testing.T) {
defaultQuota := quota.DefaultTagQuota
tags := make(map[string]string, defaultQuota)
for i := 0; i < defaultQuota; i++ {
@@ -282,7 +256,7 @@ func TestTagsAdd(t *testing.T) {
}
func TestTagsGet(t *testing.T) {
- t.Run("create service and add tags", func(t *testing.T) {
+ t.Run("create service and add tags, the request is valid, should pass",
func(t *testing.T) {
svc := &pb.MicroService{
ServiceId: "get_tag_group_ms_id",
AppId: "get_tag_group_ms",
@@ -318,7 +292,7 @@ func TestTagsGet(t *testing.T) {
}
func TestTagUpdate(t *testing.T) {
- t.Run("add service and add tags", func(t *testing.T) {
+ t.Run("add service and add tags, the request is valid, should pass",
func(t *testing.T) {
svc := &pb.MicroService{
ServiceId: "update_tag_group_ms_id",
AppId: "update_tag_group_ms",
@@ -344,7 +318,7 @@ func TestTagUpdate(t *testing.T) {
assert.Equal(t, pb.ResponseSuccess,
respAddTags.Response.GetCode())
})
- t.Run("the request is valid", func(t *testing.T) {
+ t.Run("the request is valid, should pass", func(t *testing.T) {
resp, err := datasource.Instance().UpdateTag(getContext(),
&pb.UpdateServiceTagRequest{
ServiceId: "update_tag_group_ms_id",
Key: "a",
@@ -356,7 +330,7 @@ func TestTagUpdate(t *testing.T) {
}
func TestTagsDelete(t *testing.T) {
- t.Run("create service and add tags", func(t *testing.T) {
+ t.Run("create service and add tags, the request is valid, should pass",
func(t *testing.T) {
resp, err :=
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
Service: &pb.MicroService{
ServiceId: "delete_tag_group_ms_id",
@@ -380,7 +354,7 @@ func TestTagsDelete(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, pb.ResponseSuccess,
respAddTages.Response.GetCode())
})
- t.Run("the request is valid", func(t *testing.T) {
+ t.Run("the request is valid, should pass", func(t *testing.T) {
resp, err := datasource.Instance().DeleteTags(getContext(),
&pb.DeleteServiceTagsRequest{
ServiceId: "delete_tag_group_ms_id",
Keys: []string{"b"},
@@ -398,7 +372,7 @@ func TestTagsDelete(t *testing.T) {
}
func TestRuleAdd(t *testing.T) {
- t.Run("register service and datasource.Instance()", func(t *testing.T) {
+ t.Run("register service, the request is valid, should pass", func(t
*testing.T) {
respCreateService, err :=
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
Service: &pb.MicroService{
ServiceId: "create_rule_group_ms_id",
@@ -412,7 +386,7 @@ func TestRuleAdd(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, pb.ResponseSuccess,
respCreateService.Response.GetCode())
})
- t.Run("request is valid", func(t *testing.T) {
+ t.Run("register service, the request is valid, should pass", func(t
*testing.T) {
respAddRule, err := datasource.Instance().AddRule(getContext(),
&pb.AddServiceRulesRequest{
ServiceId: "create_rule_group_ms_id",
Rules: []*pb.AddOrUpdateServiceRule{
@@ -429,7 +403,7 @@ func TestRuleAdd(t *testing.T) {
ruleId := respAddRule.RuleIds[0]
assert.NotEqual(t, "", ruleId)
})
- t.Run("request rule is already exist", func(t *testing.T) {
+ t.Run("request rule is already exist, should pass", func(t *testing.T) {
respAddRule, err := datasource.Instance().AddRule(getContext(),
&pb.AddServiceRulesRequest{
ServiceId: "create_rule_group_ms_id",
Rules: []*pb.AddOrUpdateServiceRule{
@@ -448,7 +422,7 @@ func TestRuleAdd(t *testing.T) {
}
func TestRuleGet(t *testing.T) {
- t.Run("register service and rules", func(t *testing.T) {
+ t.Run("register service and rules, the request is valid, should pass",
func(t *testing.T) {
respCreateService, err :=
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
Service: &pb.MicroService{
ServiceId: "get_rule_group_ms_id",
@@ -478,7 +452,7 @@ func TestRuleGet(t *testing.T) {
ruleId := respAddRule.RuleIds[0]
assert.NotEqual(t, "", ruleId)
})
- t.Run("get when request is valid", func(t *testing.T) {
+ t.Run("get rule, when request is valid, should pass", func(t
*testing.T) {
respGetRule, err :=
datasource.Instance().GetRules(getContext(), &pb.GetServiceRulesRequest{
ServiceId: "get_rule_group_ms_id",
})
@@ -490,7 +464,7 @@ func TestRuleGet(t *testing.T) {
func TestRuleDelete(t *testing.T) {
var ruleId string
- t.Run("register service and rules", func(t *testing.T) {
+ t.Run("register service and rules, when request is valid, should pass",
func(t *testing.T) {
respCreateService, err :=
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
Service: &pb.MicroService{
ServiceId: "delete_rule_group_ms_id",
@@ -519,7 +493,7 @@ func TestRuleDelete(t *testing.T) {
assert.Equal(t, pb.ResponseSuccess,
respAddRule.Response.GetCode())
ruleId = respAddRule.RuleIds[0]
})
- t.Run("delete when request is valid", func(t *testing.T) {
+ t.Run("delete rule, when request is valid, should pass", func(t
*testing.T) {
resp, err := datasource.Instance().DeleteRule(getContext(),
&pb.DeleteServiceRulesRequest{
ServiceId: "delete_rule_group_ms_id",
RuleIds: []string{ruleId},
@@ -538,7 +512,7 @@ func TestRuleDelete(t *testing.T) {
func TestRuleUpdate(t *testing.T) {
var ruleId string
- t.Run("create service and rules", func(t *testing.T) {
+ t.Run("create service and rules, when request is valid, should pass",
func(t *testing.T) {
respCreateService, err :=
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
Service: &pb.MicroService{
ServiceId: "update_rule_group_ms_id",
@@ -568,7 +542,7 @@ func TestRuleUpdate(t *testing.T) {
assert.NotEqual(t, "", respAddRule.RuleIds[0])
ruleId = respAddRule.RuleIds[0]
})
- t.Run("update when request is valid", func(t *testing.T) {
+ t.Run("update rule, when request is valid, should pass", func(t
*testing.T) {
resp, err := datasource.Instance().UpdateRule(getContext(),
&pb.UpdateServiceRuleRequest{
ServiceId: "update_rule_group_ms_id",
RuleId: ruleId,
@@ -584,70 +558,500 @@ func TestRuleUpdate(t *testing.T) {
})
}
-// 需要多集群mongo支持
-//func TestSchema(t *testing.T) {
-// t.Run("create a schema in production env", func(t *testing.T) {
-// respCreateService, err :=
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
-// Service: &pb.MicroService{
-// ServiceId:
"create_schema_prod_service_ms_id1",
-// AppId: "create_schema_prod_service_ms",
-// ServiceName: "create_schema_service_service_ms",
-// Version: "1.0.0",
-// Level: "FRONT",
-// Status: pb.MS_UP,
-// Environment: pb.ENV_PROD,
-// },
-// })
-// assert.NoError(t, err)
-// assert.Equal(t, pb.ResponseSuccess,
respCreateService.Response.GetCode())
-//
-// respCreateService, err =
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
-// Service: &pb.MicroService{
-// ServiceId:
"create_schema_prod_service_ms_id2",
-// AppId: "create_schema_prod_service_ms",
-// ServiceName: "create_schema_service_service_ms",
-// Version: "1.0.1",
-// Level: "FRONT",
-// Schemas: []string{
-// "first_schemaId_service_ms",
-// "second_schemaId_service_ms",
-// },
-// Status: pb.MS_UP,
-// Environment: pb.ENV_PROD,
-// },
-// })
-// assert.NoError(t, err)
-// assert.Equal(t, pb.ResponseSuccess,
respCreateService.Response.GetCode())
-//
-// respModifySchema, err :=
datasource.Instance().ModifySchema(getContext(), &pb.ModifySchemaRequest{
-// ServiceId: "create_schema_prod_service_ms_id1",
-// SchemaId: "first_schemaId_service_ms",
-// Schema: "first_schema_service_ms",
-// })
-// assert.NoError(t, err)
-// assert.Equal(t, pb.ResponseSuccess,
respModifySchema.Response.GetCode())
-//
-// respModifySchema, err =
datasource.Instance().ModifySchema(getContext(), &pb.ModifySchemaRequest{
-// ServiceId: "create_schema_prod_service_ms_id1",
-// SchemaId: "first_schemaId_service_ms",
-// Schema: "first_schema_change_service_ms",
-// Summary: "first0summary1change_service_ms",
-// })
-// assert.NoError(t, err)
-// assert.Equal(t, pb.ResponseSuccess,
respModifySchema.Response.GetCode())
-// existRes, err :=
datasource.Instance().ExistSchema(getContext(), &pb.GetExistenceRequest{
-// ServiceId: "create_schema_prod_service_ms_id1",
-// SchemaId: "first_schemaId_service_ms",
-// })
-// assert.NoError(t, err)
-// assert.Equal(t, pb.ResponseSuccess, existRes.Response.GetCode())
-// assert.Equal(t, "first0summary1change_service_ms",
existRes.Summary)
-//
-// resSchemas, err :=
datasource.Instance().GetAllSchemas(getContext(), &pb.GetAllSchemaRequest{
-// ServiceId: "create_schema_prod_service_ms_id1",
-// WithSchema: true,
-// })
-// assert.NoError(t, err)
-// assert.Equal(t, 1, len(resSchemas.Schemas))
-// })
-//}
+func TestInstance_Creat(t *testing.T) {
+ var serviceId string
+
+ t.Run("create service, when request is valid, should pass", func(t
*testing.T) {
+ insertRes, err :=
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: "service1",
+ ServiceName: "create_instance_service_ms",
+ AppId: "create_instance_ms",
+ Version: "1.0.0",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
insertRes.Response.GetCode())
+ serviceId = insertRes.ServiceId
+ })
+
+ t.Run("register instance, when request is valid, should pass", func(t
*testing.T) {
+ respCreateInst, err :=
datasource.Instance().RegisterInstance(getContext(),
&pb.RegisterInstanceRequest{
+ Instance: &pb.MicroServiceInstance{
+ ServiceId: serviceId,
+ Endpoints: []string{
+ "createInstance_ms:127.0.0.1:8080",
+ },
+ HostName: "UT-HOST",
+ Status: pb.MSI_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateInst.Response.GetCode())
+ assert.NotEqual(t, "ins_instance", respCreateInst.InstanceId)
+
+ respCreateInst, err =
datasource.Instance().RegisterInstance(getContext(),
&pb.RegisterInstanceRequest{
+ Instance: &pb.MicroServiceInstance{
+ InstanceId: "instance2",
+ ServiceId: serviceId,
+ Endpoints: []string{
+ "createInstance_ms:127.0.0.1:8080",
+ },
+ HostName: "UT-HOST",
+ Status: pb.MSI_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateInst.Response.GetCode())
+ assert.Equal(t, "instance2", respCreateInst.InstanceId)
+ })
+
+ t.Run("update the same instance, should pass", func(t *testing.T) {
+ resp, err :=
datasource.Instance().RegisterInstance(getContext(),
&pb.RegisterInstanceRequest{
+ Instance: &pb.MicroServiceInstance{
+ ServiceId: serviceId,
+ InstanceId: "instance3",
+ Endpoints: []string{
+ "sameInstance:127.0.0.1:8080",
+ },
+ HostName: "UT-HOST",
+ Status: pb.MSI_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+
+ resp, err =
datasource.Instance().RegisterInstance(getContext(),
&pb.RegisterInstanceRequest{
+ Instance: &pb.MicroServiceInstance{
+ ServiceId: serviceId,
+ InstanceId: "instance4",
+ Endpoints: []string{
+ "sameInstance:127.0.0.1:8080",
+ },
+ HostName: "UT-HOST",
+ Status: pb.MSI_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+ assert.Equal(t, "instance4", resp.InstanceId)
+ })
+
+ t.Run("delete test data", func(t *testing.T) {
+ _, err := client.GetMongoClient().Delete(getContext(),
mongo.CollectionService, bson.M{"domain": "default", "project": "default"})
+ assert.NoError(t, err)
+
+ _, err = client.GetMongoClient().Delete(getContext(),
mongo.CollectionInstance, bson.M{"domain": "default", "project": "default"})
+ assert.NoError(t, err)
+ })
+}
+
+func TestInstance_update(t *testing.T) {
+
+ var (
+ serviceId string
+ instanceId string
+ )
+
+ t.Run("register service and instance, when request is valid, should
pass", func(t *testing.T) {
+ respCreateService, err :=
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: "service1",
+ ServiceName: "update_instance_service_ms",
+ AppId: "update_instance_service_ms",
+ Version: "1.0.0",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateService.Response.GetCode())
+ serviceId = respCreateService.ServiceId
+
+ respCreateInstance, err :=
datasource.Instance().RegisterInstance(getContext(),
&pb.RegisterInstanceRequest{
+ Instance: &pb.MicroServiceInstance{
+ ServiceId: serviceId,
+ InstanceId: "instance1",
+ Endpoints: []string{
+ "updateInstance:127.0.0.1:8080",
+ },
+ HostName: "UT-HOST-MS",
+ Status: pb.MSI_UP,
+ Properties: map[string]string{"nodeIP": "test"},
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateInstance.Response.GetCode())
+ instanceId = respCreateInstance.InstanceId
+ })
+
+ t.Run("update instance status, should pass", func(t *testing.T) {
+ respUpdateStatus, err :=
datasource.Instance().UpdateInstanceStatus(getContext(),
&pb.UpdateInstanceStatusRequest{
+ ServiceId: serviceId,
+ InstanceId: instanceId,
+ Status: pb.MSI_DOWN,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respUpdateStatus.Response.GetCode())
+
+ respUpdateStatus, err =
datasource.Instance().UpdateInstanceStatus(getContext(),
&pb.UpdateInstanceStatusRequest{
+ ServiceId: serviceId,
+ InstanceId: instanceId,
+ Status: pb.MSI_OUTOFSERVICE,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respUpdateStatus.Response.GetCode())
+
+ respUpdateStatus, err =
datasource.Instance().UpdateInstanceStatus(getContext(),
&pb.UpdateInstanceStatusRequest{
+ ServiceId: serviceId,
+ InstanceId: instanceId,
+ Status: pb.MSI_STARTING,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respUpdateStatus.Response.GetCode())
+
+ respUpdateStatus, err =
datasource.Instance().UpdateInstanceStatus(getContext(),
&pb.UpdateInstanceStatusRequest{
+ ServiceId: serviceId,
+ InstanceId: instanceId,
+ Status: pb.MSI_TESTING,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respUpdateStatus.Response.GetCode())
+
+ respUpdateStatus, err =
datasource.Instance().UpdateInstanceStatus(getContext(),
&pb.UpdateInstanceStatusRequest{
+ ServiceId: serviceId,
+ InstanceId: instanceId,
+ Status: pb.MSI_UP,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respUpdateStatus.Response.GetCode())
+
+ respUpdateStatus, err =
datasource.Instance().UpdateInstanceStatus(getContext(),
&pb.UpdateInstanceStatusRequest{
+ ServiceId: serviceId,
+ InstanceId: "notexistins",
+ Status: pb.MSI_STARTING,
+ })
+ assert.NoError(t, err)
+ assert.NotEqual(t, pb.ResponseSuccess,
respUpdateStatus.Response.GetCode())
+ })
+
+ t.Run("update instance properties, should pass", func(t *testing.T) {
+ respUpdateProperties, err :=
datasource.Instance().UpdateInstanceProperties(getContext(),
+ &pb.UpdateInstancePropsRequest{
+ ServiceId: serviceId,
+ InstanceId: instanceId,
+ Properties: map[string]string{
+ "test": "test",
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respUpdateProperties.Response.GetCode())
+
+ size := 1000
+ properties := make(map[string]string, size)
+ for i := 0; i < size; i++ {
+ s := strconv.Itoa(i) + strings.Repeat("x", 253)
+ properties[s] = s
+ }
+ respUpdateProperties, err =
datasource.Instance().UpdateInstanceProperties(getContext(),
+ &pb.UpdateInstancePropsRequest{
+ ServiceId: serviceId,
+ InstanceId: instanceId,
+ Properties: properties,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respUpdateProperties.Response.GetCode())
+
+ respUpdateProperties, err =
datasource.Instance().UpdateInstanceProperties(getContext(),
+ &pb.UpdateInstancePropsRequest{
+ ServiceId: serviceId,
+ InstanceId: "not_exist_ins",
+ Properties: map[string]string{
+ "test": "test",
+ },
+ })
+ assert.NoError(t, err)
+ assert.NotEqual(t, pb.ResponseSuccess,
respUpdateProperties.Response.GetCode())
+
+ respUpdateProperties, err =
datasource.Instance().UpdateInstanceProperties(getContext(),
+ &pb.UpdateInstancePropsRequest{
+ ServiceId: serviceId,
+ InstanceId: instanceId,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respUpdateProperties.Response.GetCode())
+
+ respUpdateProperties, err =
datasource.Instance().UpdateInstanceProperties(getContext(),
+ &pb.UpdateInstancePropsRequest{
+ ServiceId: "not_exist_service",
+ InstanceId: instanceId,
+ Properties: map[string]string{
+ "test": "test",
+ },
+ })
+ assert.NoError(t, err)
+ assert.NotEqual(t, pb.ResponseSuccess,
respUpdateProperties.Response.GetCode())
+ })
+
+ t.Run("delete test data", func(t *testing.T) {
+ _, err := client.GetMongoClient().Delete(getContext(),
mongo.CollectionService, bson.M{"domain": "default", "project": "default"})
+ assert.NoError(t, err)
+
+ _, err = client.GetMongoClient().Delete(getContext(),
mongo.CollectionInstance, bson.M{"domain": "default", "project": "default"})
+ assert.NoError(t, err)
+ })
+}
+
+func TestInstance_Query(t *testing.T) {
+
+ var (
+ serviceId1 string
+ instanceId1 string
+ )
+
+ t.Run("register services and instance for testInstance_query, when
request is invalid, should pass", func(t *testing.T) {
+ insertServiceRes, err :=
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: "service1",
+ AppId: "query_instance_ms",
+ ServiceName: "query_instance_service_ms",
+ Version: "1.0.0",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
insertServiceRes.Response.GetCode())
+ serviceId1 = insertServiceRes.ServiceId
+
+ insertInstanceRes, err :=
datasource.Instance().RegisterInstance(getContext(),
&pb.RegisterInstanceRequest{
+ Instance: &pb.MicroServiceInstance{
+ InstanceId: "instance1",
+ ServiceId: serviceId1,
+ HostName: "UT-HOST-MS",
+ Endpoints: []string{
+ "find:127.0.0.1:8080",
+ },
+ Status: pb.MSI_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
insertInstanceRes.Response.GetCode())
+ instanceId1 = insertInstanceRes.InstanceId
+ })
+
+ t.Run("query instance, when request is invalid, should pass", func(t
*testing.T) {
+ findRes, err :=
datasource.Instance().FindInstances(getContext(), &pb.FindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ AppId: "query_instance_ms",
+ ServiceName: "query_instance_service_ms",
+ VersionRule: "latest",
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, findRes.Response.GetCode())
+ assert.Equal(t, instanceId1, findRes.Instances[0].InstanceId)
+ })
+
+ t.Run("batch query instance, when request is invalid, should pass",
func(t *testing.T) {
+ respFind, err := datasource.Instance().BatchFind(getContext(),
&pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Services: []*pb.FindService{
+ {
+ Service: &pb.MicroServiceKey{
+ AppId:
"query_instance_ms",
+ ServiceName:
"query_instance_service_ms",
+ Version: "latest",
+ },
+ },
+ {
+ Service: &pb.MicroServiceKey{
+ AppId:
"query_instance_ms",
+ ServiceName:
"query_instance_service_ms",
+ Version: "1.0.0+",
+ },
+ },
+ {
+ Service: &pb.MicroServiceKey{
+ AppId:
"query_instance_ms",
+ ServiceName:
"query_instance_service_ms",
+ Version: "0.0.0",
+ },
+ },
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, respFind.Response.GetCode())
+ assert.Equal(t, int64(0), respFind.Services.Updated[0].Index)
+
+ })
+
+ t.Run("delete test data", func(t *testing.T) {
+ _, err := client.GetMongoClient().Delete(getContext(),
mongo.CollectionService, bson.M{"domain": "default", "project": "default"})
+ assert.NoError(t, err)
+
+ _, err = client.GetMongoClient().Delete(getContext(),
mongo.CollectionInstance, bson.M{"domain": "default", "project": "default"})
+ assert.NoError(t, err)
+ })
+}
+
+func TestInstance_GetOne(t *testing.T) {
+
+ var (
+ serviceId1 string
+ serviceId2 string
+ serviceId3 string
+ instanceId2 string
+ )
+
+ t.Run("register service and instances, when request is invalid, should
pass", func(t *testing.T) {
+ respCreateService, err :=
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: "service1",
+ AppId: "get_instance_ms",
+ ServiceName: "get_instance_service_ms",
+ Version: "1.0.0",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateService.Response.GetCode())
+ serviceId1 = respCreateService.ServiceId
+
+ respCreateService, err =
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: "service2",
+ AppId: "get_instance_ms",
+ ServiceName: "get_instance_service_ms",
+ Version: "1.0.5",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateService.Response.GetCode())
+ serviceId2 = respCreateService.ServiceId
+
+ respCreateInstance, err :=
datasource.Instance().RegisterInstance(getContext(),
&pb.RegisterInstanceRequest{
+ Instance: &pb.MicroServiceInstance{
+ InstanceId: "instance1",
+ ServiceId: serviceId2,
+ HostName: "UT-HOST-MS",
+ Endpoints: []string{
+ "get:127.0.0.2:8080",
+ },
+ Status: pb.MSI_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateService.Response.GetCode())
+ instanceId2 = respCreateInstance.InstanceId
+
+ respCreateService, err =
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: "service3",
+ AppId: "get_instance_cross_ms",
+ ServiceName: "get_instance_service_ms",
+ Version: "1.0.0",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateService.Response.GetCode())
+ serviceId3 = respCreateService.ServiceId
+ })
+
+ t.Run("get between diff apps, when request is invalid, should pass",
func(t *testing.T) {
+ resp, err := datasource.Instance().GetInstance(getContext(),
&pb.GetOneInstanceRequest{
+ ConsumerServiceId: serviceId3,
+ ProviderServiceId: serviceId2,
+ ProviderInstanceId: instanceId2,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+ })
+
+ t.Run("get instances, when request is invalid, should pass", func(t
*testing.T) {
+ resp, err := datasource.Instance().GetInstances(getContext(),
&pb.GetInstancesRequest{
+ ConsumerServiceId: "not-exist-service-ms",
+ ProviderServiceId: serviceId2,
+ })
+ assert.NoError(t, err)
+ assert.NotEqual(t, pb.ResponseSuccess, resp.Response.GetCode())
+ resp, err = datasource.Instance().GetInstances(getContext(),
&pb.GetInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ ProviderServiceId: serviceId2,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+ })
+
+ t.Run("delete test data", func(t *testing.T) {
+ _, err := client.GetMongoClient().Delete(getContext(),
mongo.CollectionService, bson.M{"domain": "default", "project": "default"})
+ assert.NoError(t, err)
+
+ _, err = client.GetMongoClient().Delete(getContext(),
mongo.CollectionInstance, bson.M{"domain": "default", "project": "default"})
+ assert.NoError(t, err)
+ })
+}
+
+func TestInstance_Unregister(t *testing.T) {
+ var (
+ serviceId string
+ instanceId string
+ )
+
+ t.Run("register service and instances, when request is invalid, should
pass", func(t *testing.T) {
+ respCreateService, err :=
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: "service1",
+ AppId: "unregister_instance_ms",
+ ServiceName: "unregister_instance_service_ms",
+ Version: "1.0.5",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ Tags: map[string]string{
+ "test": "test",
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateService.Response.GetCode())
+ serviceId = respCreateService.ServiceId
+
+ respCreateInstance, err :=
datasource.Instance().RegisterInstance(getContext(),
&pb.RegisterInstanceRequest{
+ Instance: &pb.MicroServiceInstance{
+ InstanceId: "instance1",
+ ServiceId: serviceId,
+ HostName: "UT-HOST-MS",
+ Endpoints: []string{
+ "unregister:127.0.0.2:8080",
+ },
+ Status: pb.MSI_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess,
respCreateInstance.Response.GetCode())
+ instanceId = respCreateInstance.InstanceId
+ })
+
+ t.Run("unregister instance, when request is invalid, should pass",
func(t *testing.T) {
+ resp, err :=
datasource.Instance().UnregisterInstance(getContext(),
&pb.UnregisterInstanceRequest{
+ ServiceId: serviceId,
+ InstanceId: instanceId,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+ })
+
+ t.Run("delete test data", func(t *testing.T) {
+ _, err := client.GetMongoClient().Delete(getContext(),
mongo.CollectionService, bson.M{"domain": "default", "project": "default"})
+ assert.NoError(t, err)
+
+ _, err = client.GetMongoClient().Delete(getContext(),
mongo.CollectionInstance, bson.M{"domain": "default", "project": "default"})
+ assert.NoError(t, err)
+ })
+}
diff --git a/datasource/mongo/engine.go b/datasource/mongo/util.go
similarity index 60%
copy from datasource/mongo/engine.go
copy to datasource/mongo/util.go
index 246ce78..8de33ae 100644
--- a/datasource/mongo/engine.go
+++ b/datasource/mongo/util.go
@@ -17,29 +17,16 @@
package mongo
-import (
- "context"
- "time"
+import "strings"
- "github.com/apache/servicecomb-service-center/pkg/cluster"
-)
-
-func (ds *DataSource) SelfRegister(ctx context.Context) error {
- return nil
-}
-func (ds *DataSource) SelfUnregister(ctx context.Context) error {
- return nil
-}
-
-// OPS
-func (ds *DataSource) ClearNoInstanceServices(ctx context.Context, ttl
time.Duration) error {
- return nil
-}
-
-func (ds *DataSource) UpgradeVersion(ctx context.Context) error {
- return nil
-}
-
-func (ds *DataSource) GetClusters(ctx context.Context) (cluster.Clusters,
error) {
- return nil, nil
+func StringBuilder(data []string) string {
+ var str strings.Builder
+ for index, value := range data {
+ if index == 0 {
+ str.WriteString(value)
+ } else {
+ str.WriteString("." + value)
+ }
+ }
+ return str.String()
}