tianxiaoliang commented on a change in pull request #759:
URL:
https://github.com/apache/servicecomb-service-center/pull/759#discussion_r531824689
##########
File path: datasource/mongo/ms_test.go
##########
@@ -650,3 +652,503 @@ func TestRuleUpdate(t *testing.T) {
// assert.Equal(t, 1, len(resSchemas.Schemas))
Review comment:
不要注释代码
##########
File path: datasource/mongo/ms.go
##########
@@ -1345,56 +1382,896 @@ func SchemaExist(ctx context.Context, serviceID,
schemaID string) (bool, error)
// Instance management
func (ds *DataSource) RegisterInstance(ctx context.Context, request
*pb.RegisterInstanceRequest) (*pb.RegisterInstanceResponse, error) {
- return &pb.RegisterInstanceResponse{}, nil
+ remoteIP := util.GetIPFromContext(ctx)
+ instance := request.Instance
+
+ // 允许自定义 id
+ if len(instance.InstanceId) > 0 {
+ resp, err := ds.Heartbeat(ctx, &pb.HeartbeatRequest{
+ InstanceId: instance.InstanceId,
+ ServiceId: instance.ServiceId,
+ })
+ if resp == nil {
+ log.Errorf(err, "register service[%s]'s instance
failed, endpoints %v, host '%s', operator %s",
+ instance.ServiceId, instance.Endpoints,
instance.HostName, remoteIP)
+ return &pb.RegisterInstanceResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, nil
+ }
+ switch resp.Response.GetCode() {
+ case pb.ResponseSuccess:
+ log.Infof("register instance successful, reuse
instance[%s/%s], operator %s",
+ instance.ServiceId, instance.InstanceId,
remoteIP)
+ return &pb.RegisterInstanceResponse{
+ Response: resp.Response,
+ InstanceId: instance.InstanceId,
+ }, nil
+ case pb.ErrInstanceNotExists:
+ // register a new one
+ return registryInstance(ctx, request)
+ default:
+ log.Errorf(err, "register instance failed, reuse
instance[%s/%s], operator %s",
+ instance.ServiceId, instance.InstanceId,
remoteIP)
+ return &pb.RegisterInstanceResponse{
+ Response: resp.Response,
+ }, err
+ }
+ }
+
+ if err := preProcessRegisterInstance(ctx, instance); err != nil {
+ log.Errorf(err, "register service[%s]'s instance failed,
endpoints %v, host '%s', operator %s",
+ instance.ServiceId, instance.Endpoints,
instance.HostName, remoteIP)
+ return &pb.RegisterInstanceResponse{
+ Response: pb.CreateResponseWithSCErr(err),
+ }, nil
+ }
+ return registryInstance(ctx, request)
}
// GetInstances returns instances under the current domain
func (ds *DataSource) GetInstance(ctx context.Context, request
*pb.GetOneInstanceRequest) (*pb.GetOneInstanceResponse, error) {
- return &pb.GetOneInstanceResponse{}, nil
+ service := &Service{}
+ var err error
+ if len(request.ConsumerServiceId) > 0 {
+ filter := GeneratorServiceFilter(ctx, request.ConsumerServiceId)
+ service, err = GetService(ctx, filter)
+ if err != nil {
+ log.Errorf(err, "get consumer failed, consumer[%s] find
provider instance[%s/%s]",
+ request.ConsumerServiceId,
request.ProviderServiceId, request.ProviderInstanceId)
+ return &pb.GetOneInstanceResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ if service == nil {
+ log.Errorf(nil, "consumer does not exist, consumer[%s]
find provider instance[%s/%s]",
+ request.ConsumerServiceId,
request.ProviderServiceId, request.ProviderInstanceId)
+ return &pb.GetOneInstanceResponse{
+ Response:
pb.CreateResponse(pb.ErrServiceNotExists,
+ fmt.Sprintf("Consumer[%s] does not
exist.", request.ConsumerServiceId)),
+ }, nil
+ }
+ }
+
+ filter := GeneratorServiceFilter(ctx, request.ProviderServiceId)
+ provider, err := GetService(ctx, filter)
+ if err != nil {
+ log.Errorf(err, "get provider failed, consumer[%s] find
provider instance[%s/%s]",
+ request.ConsumerServiceId, request.ProviderServiceId,
request.ProviderInstanceId)
+ return &pb.GetOneInstanceResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ if provider == nil {
+ log.Errorf(nil, "provider does not exist, consumer[%s] find
provider instance[%s/%s]",
+ request.ConsumerServiceId, request.ProviderServiceId,
request.ProviderInstanceId)
+ return &pb.GetOneInstanceResponse{
+ Response: pb.CreateResponse(pb.ErrServiceNotExists,
+ fmt.Sprintf("Provider[%s] does not exist.",
request.ProviderServiceId)),
+ }, nil
+ }
+
+ findFlag := func() string {
+ return fmt.Sprintf("Consumer[%s][%s/%s/%s/%s] find
provider[%s][%s/%s/%s/%s] instance[%s]",
+ request.ConsumerServiceId,
service.ServiceInfo.Environment, service.ServiceInfo.AppId,
service.ServiceInfo.ServiceName, service.ServiceInfo.Version,
+ provider.ServiceInfo.ServiceId,
provider.ServiceInfo.Environment, provider.ServiceInfo.AppId,
provider.ServiceInfo.ServiceName, provider.ServiceInfo.Version,
+ request.ProviderInstanceId)
+ }
+
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+ filter = bson.M{
+ Domain: domain,
+ Project: project,
+ strings.Join([]string{InstanceInfo, ServiceID}, "."):
request.ProviderServiceId}
+ findOneRes, err := client.GetMongoClient().FindOne(ctx,
CollectionInstance, filter)
+ if err != nil {
+ mes := fmt.Errorf("%s failed, provider instance does not
exist", findFlag())
+ log.Errorf(mes, "FindInstances.GetWithProviderID failed")
+ return &pb.GetOneInstanceResponse{
+ Response: pb.CreateResponse(pb.ErrInstanceNotExists,
mes.Error()),
+ }, nil
+ }
+ var instance Instance
+ err = findOneRes.Decode(&instance)
+ if err != nil {
+ log.Errorf(err, "FindInstances.GetWithProviderID failed, %s
failed", findFlag())
+ return &pb.GetOneInstanceResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+
+ return &pb.GetOneInstanceResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess, "Get instance
successfully."),
+ Instance: instance.InstanceInfo,
+ }, nil
}
func (ds *DataSource) GetInstances(ctx context.Context, request
*pb.GetInstancesRequest) (*pb.GetInstancesResponse, error) {
- return &pb.GetInstancesResponse{}, nil
+ service := &Service{}
+ var err error
+
+ if len(request.ConsumerServiceId) > 0 {
+ filter := GeneratorServiceFilter(ctx, request.ConsumerServiceId)
+ service, err = GetService(ctx, filter)
+ if err != nil {
+ log.Errorf(err, "get consumer failed, consumer[%s] find
provider instances",
+ request.ConsumerServiceId,
request.ProviderServiceId)
+ return &pb.GetInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ if service == nil {
+ log.Errorf(nil, "consumer does not exist, consumer[%s]
find provider instances",
+ request.ConsumerServiceId,
request.ProviderServiceId)
+ return &pb.GetInstancesResponse{
+ Response:
pb.CreateResponse(pb.ErrServiceNotExists,
+ fmt.Sprintf("Consumer[%s] does not
exist.", request.ConsumerServiceId)),
+ }, nil
+ }
+ }
+
+ filter := GeneratorServiceFilter(ctx, request.ProviderServiceId)
+ provider, err := GetService(ctx, filter)
+ if err != nil {
+ log.Errorf(err, "get provider failed, consumer[%s] find
provider instances",
+ request.ConsumerServiceId, request.ProviderServiceId)
+ return &pb.GetInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ if provider == nil {
+ log.Errorf(nil, "provider does not exist, consumer[%s] find
provider instances",
+ request.ConsumerServiceId, request.ProviderServiceId)
+ return &pb.GetInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrServiceNotExists,
+ fmt.Sprintf("Provider[%s] does not exist.",
request.ProviderServiceId)),
+ }, nil
+ }
+
+ findFlag := fmt.Sprintf("Consumer[%s][%s/%s/%s/%s] find
provider[%s][%s/%s/%s/%s] instances",
+ request.ConsumerServiceId, service.ServiceInfo.Environment,
service.ServiceInfo.AppId, service.ServiceInfo.ServiceName,
service.ServiceInfo.Version,
+ provider.ServiceInfo.ServiceId,
provider.ServiceInfo.Environment, provider.ServiceInfo.AppId,
provider.ServiceInfo.ServiceName, provider.ServiceInfo.Version)
+
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+ filter = bson.M{
+ Domain: domain,
+ Project: project,
+ strings.Join([]string{InstanceInfo, ServiceID}, "."):
request.ProviderServiceId}
+ resp, err := client.GetMongoClient().Find(ctx, CollectionInstance,
filter)
+ if err != nil {
+ log.Errorf(err, "FindInstancesCache.Get failed, %s failed",
findFlag)
+ return &pb.GetInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ if resp == nil {
+ mes := fmt.Errorf("%s failed, provider does not exist",
findFlag)
+ log.Errorf(mes, "FindInstancesCache.Get failed")
+ return &pb.GetInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrServiceNotExists,
mes.Error()),
+ }, nil
+ }
+
+ var instances []*pb.MicroServiceInstance
+ for resp.Next(ctx) {
+ var instance Instance
+ err := resp.Decode(&instance)
+ if err != nil {
+ log.Errorf(err, "FindInstances.GetWithProviderID
failed, %s failed", findFlag)
+ return &pb.GetInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ instances = append(instances, instance.InstanceInfo)
+ }
+
+ return &pb.GetInstancesResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess, "Query service
instances successfully."),
+ Instances: instances,
+ }, nil
}
// GetProviderInstances returns instances under the specified domain
func (ds *DataSource) GetProviderInstances(ctx context.Context, request
*pb.GetProviderInstancesRequest) (instances []*pb.MicroServiceInstance, rev
string, err error) {
- return nil, "", nil
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+ filter := bson.M{
+ Domain: domain,
+ Project: project,
+ strings.Join([]string{InstanceInfo, ServiceID}, "."):
request.ProviderServiceId}
+
+ findRes, err := client.GetMongoClient().Find(ctx, CollectionInstance,
filter)
+ if err != nil {
+ return
+ }
+
+ for findRes.Next(ctx) {
+ var mongoInstance Instance
+ err := findRes.Decode(&mongoInstance)
+ if err == nil {
+ instances = append(instances,
mongoInstance.InstanceInfo)
+ }
+ }
+
+ return instances, "", nil
}
func (ds *DataSource) GetAllInstances(ctx context.Context, request
*pb.GetAllInstancesRequest) (*pb.GetAllInstancesResponse, error) {
- return &pb.GetAllInstancesResponse{}, nil
+
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+
+ filter := bson.M{Domain: domain, Project: project}
+
+ findRes, err := client.GetMongoClient().Find(ctx, CollectionInstance,
filter)
+ if err != nil {
+ return nil, err
+ }
+ resp := &pb.GetAllInstancesResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess, "Get all
instances successfully"),
+ }
+
+ for findRes.Next(ctx) {
+ var instance Instance
+ err := findRes.Decode(&instance)
+ if err != nil {
+ return &pb.GetAllInstancesResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ resp.Instances = append(resp.Instances, instance.InstanceInfo)
+ }
+
+ return resp, nil
}
func (ds *DataSource) BatchGetProviderInstances(ctx context.Context, request
*pb.BatchGetInstancesRequest) (instances []*pb.MicroServiceInstance, rev
string, err error) {
- return nil, "", nil
+ if request == nil || len(request.ServiceIds) == 0 {
+ return nil, "", fmt.Errorf("invalid param
BatchGetInstancesRequest")
+ }
+
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+
+ for _, providerServiceID := range request.ServiceIds {
+ // todo finish find instances
+
+ filter := bson.M{
+ Domain: domain,
+ Project: project,
+ strings.Join([]string{InstanceInfo, ServiceID}, "."):
providerServiceID}
+ findRes, err := client.GetMongoClient().Find(ctx,
CollectionInstance, filter)
+ if err != nil {
+ return instances, "", nil
+ }
+
+ for findRes.Next(ctx) {
+ var mongoInstance Instance
+ err := findRes.Decode(&mongoInstance)
+ if err == nil {
+ instances = append(instances,
mongoInstance.InstanceInfo)
+ }
+ }
+ }
+
+ return instances, "", nil
}
// FindInstances returns instances under the specified domain
func (ds *DataSource) FindInstances(ctx context.Context, request
*pb.FindInstancesRequest) (*pb.FindInstancesResponse, error) {
- return &pb.FindInstancesResponse{}, nil
+ provider := &pb.MicroServiceKey{
+ Tenant: util.ParseTargetDomainProject(ctx),
+ Environment: request.Environment,
+ AppId: request.AppId,
+ ServiceName: request.ServiceName,
+ Alias: request.ServiceName,
+ Version: request.VersionRule,
+ }
+
+ return ds.findInstance(ctx, request, provider)
}
func (ds *DataSource) UpdateInstanceStatus(ctx context.Context, request
*pb.UpdateInstanceStatusRequest) (*pb.UpdateInstanceStatusResponse, error) {
- return &pb.UpdateInstanceStatusResponse{}, nil
+ updateStatusFlag := util.StringJoin([]string{request.ServiceId,
request.InstanceId, request.Status}, "/")
+
+ // todo finish get instance
+ instance, err := GetInstance(ctx, request.ServiceId, request.InstanceId)
+ if err != nil {
+ log.Errorf(err, "update instance[%s] status failed",
updateStatusFlag)
+ return &pb.UpdateInstanceStatusResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ if instance == nil {
+ log.Errorf(nil, "update instance[%s] status failed, instance
does not exist", updateStatusFlag)
+ return &pb.UpdateInstanceStatusResponse{
+ Response: pb.CreateResponse(pb.ErrInstanceNotExists,
"Service instance does not exist."),
+ }, nil
+ }
+
+ copyInstanceRef := *instance
+ copyInstanceRef.InstanceInfo.Status = request.Status
+
+ if err := UpdateInstanceS(ctx, copyInstanceRef.InstanceInfo); err !=
nil {
+ log.Errorf(err, "update instance[%s] status failed",
updateStatusFlag)
+ resp := &pb.UpdateInstanceStatusResponse{
+ Response: pb.CreateResponseWithSCErr(err),
+ }
+ if err.InternalError() {
+ return resp, err
+ }
+ return resp, nil
+ }
+
+ log.Infof("update instance[%s] status successfully", updateStatusFlag)
+ return &pb.UpdateInstanceStatusResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess, "Update service
instance status successfully."),
+ }, nil
}
func (ds *DataSource) UpdateInstanceProperties(ctx context.Context, request
*pb.UpdateInstancePropsRequest) (*pb.UpdateInstancePropsResponse, error) {
- return &pb.UpdateInstancePropsResponse{}, nil
+ instanceFlag := util.StringJoin([]string{request.ServiceId,
request.InstanceId}, "/")
+
+ instance, err := GetInstance(ctx, request.ServiceId, request.InstanceId)
+ if err != nil {
+ log.Errorf(err, "update instance[%s] properties failed",
instanceFlag)
+ return &pb.UpdateInstancePropsResponse{
+ Response: pb.CreateResponse(pb.ErrInternal,
err.Error()),
+ }, err
+ }
+ if instance == nil {
+ log.Errorf(nil, "update instance[%s] properties failed,
instance does not exist", instanceFlag)
+ return &pb.UpdateInstancePropsResponse{
+ Response: pb.CreateResponse(pb.ErrInstanceNotExists,
"Service instance does not exist."),
+ }, nil
+ }
+
+ copyInstanceRef := *instance
+ copyInstanceRef.InstanceInfo.Properties = request.Properties
+
+ // todo finish update instance
+ if err := UpdateInstanceP(ctx, copyInstanceRef.InstanceInfo); err !=
nil {
+ log.Errorf(err, "update instance[%s] properties failed",
instanceFlag)
+ resp := &pb.UpdateInstancePropsResponse{
+ Response: pb.CreateResponseWithSCErr(err),
+ }
+ if err.InternalError() {
+ return resp, err
+ }
+ return resp, nil
+ }
+
+ log.Infof("update instance[%s] properties successfully", instanceFlag)
+ return &pb.UpdateInstancePropsResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess, "Update service
instance properties successfully."),
+ }, nil
}
func (ds *DataSource) UnregisterInstance(ctx context.Context, request
*pb.UnregisterInstanceRequest) (*pb.UnregisterInstanceResponse, error) {
- return &pb.UnregisterInstanceResponse{}, nil
+ remoteIP := util.GetIPFromContext(ctx)
+ serviceID := request.ServiceId
+ instanceID := request.InstanceId
+
+ instanceFlag := util.StringJoin([]string{serviceID, instanceID}, "/")
+
+ // todo finish revoke instance
+ //err := revokeInstance(ctx, "", serviceID, instanceID)
+ //if err != nil {
+ // log.Errorf(err, "unregister instance failed, instance[%s],
operator %s: revoke instance failed",
+ // instanceFlag, remoteIP)
+ // resp := &pb.UnregisterInstanceResponse{
Review comment:
1
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]