This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 61f0575 SCB-1053 Add instances request struct (#508)
61f0575 is described below
commit 61f0575c761e46d2ca9232a992688f1b11c9d120
Author: little-cui <[email protected]>
AuthorDate: Tue Dec 11 21:25:26 2018 +0800
SCB-1053 Add instances request struct (#508)
* SCB-1053 Add instances request struct
* SCB-1053 Implement batch find instances
* SCB-1053 Add UTs
* SCB-1053 Implement Query instances by provider id
* SCB-1053 Fix IT failure
* SCB-1053 Bug fix
---
integration/instances_test.go | 35 +++-
server/core/proto/batch_find.go | 23 +-
server/core/swagger/v4.yaml | 21 +-
server/rest/controller/v4/instance_controller.go | 17 ++
server/service/cache/common.go | 9 +-
server/service/cache/filter_instances.go | 87 ++++++--
server/service/cache/filter_rev.go | 25 ++-
server/service/cache/filter_version.go | 14 +-
server/service/cache/instance.go | 13 +-
server/service/instance.go | 254 +++++++++++++++++------
server/service/instance_test.go | 189 ++++++++++++++---
server/service/instance_validator.go | 5 +-
server/service/rule_test.go | 6 +-
server/service/util/instance_util.go | 8 +-
server/service/util/instance_util_test.go | 22 +-
15 files changed, 562 insertions(+), 166 deletions(-)
diff --git a/integration/instances_test.go b/integration/instances_test.go
index 86976a5..2f1dbaa 100644
--- a/integration/instances_test.go
+++ b/integration/instances_test.go
@@ -364,11 +364,27 @@ var _ = Describe("MicroService Api Test", func() {
"version": serviceVersion,
},
}
+ notExistsInstance := map[string]interface{}{
+ "instance": map[string]interface{}{
+ "serviceId": serviceId,
+ "instanceId": "notexisted",
+ },
+ }
+ providerInstance := map[string]interface{}{
+ "instance": map[string]interface{}{
+ "serviceId": serviceId,
+ "instanceId": serviceInstanceID,
+ },
+ }
findRequest := map[string]interface{}{
"services": []map[string]interface{}{
provider,
notExistsService,
},
+ "instances": []map[string]interface{}{
+ providerInstance,
+ notExistsInstance,
+ },
}
body, _ := json.Marshal(findRequest)
bodyBuf := bytes.NewReader(body)
@@ -378,8 +394,10 @@ var _ = Describe("MicroService Api Test", func() {
resp, _ := scclient.Do(req)
respbody, _ := ioutil.ReadAll(resp.Body)
Expect(resp.StatusCode).To(Equal(http.StatusOK))
- servicesStruct :=
map[string][]map[string]interface{}{}
- json.Unmarshal(respbody, &servicesStruct)
+ respStruct :=
map[string]map[string][]map[string]interface{}{}
+ json.Unmarshal(respbody, &respStruct)
+ servicesStruct := respStruct["services"]
+ instancesStruct := respStruct["instances"]
failed := false
for _, services := range
servicesStruct["failed"] {
a :=
services["indexes"].([]interface{})[0] == 1.0
@@ -393,6 +411,19 @@ var _ = Describe("MicroService Api Test", func() {
Expect(servicesStruct["updated"][0]["index"]).To(Equal(0.0))
Expect(len(servicesStruct["updated"][0]["instances"].([]interface{}))).
ToNot(Equal(0))
+ failed = false
+ for _, instances := range
instancesStruct["failed"] {
+ a :=
instances["indexes"].([]interface{})[0] == 1.0
+ b :=
instances["error"].(map[string]interface{})["errorCode"] == "400017"
+ if a && b {
+ failed = true
+ break
+ }
+ }
+ Expect(failed).To(Equal(true))
+
Expect(instancesStruct["updated"][0]["index"]).To(Equal(0.0))
+
Expect(len(instancesStruct["updated"][0]["instances"].([]interface{}))).
+ ToNot(Equal(0))
})
})
diff --git a/server/core/proto/batch_find.go b/server/core/proto/batch_find.go
index 16eb99c..fc02021 100644
--- a/server/core/proto/batch_find.go
+++ b/server/core/proto/batch_find.go
@@ -24,6 +24,11 @@ type FindService struct {
Rev string `protobuf:"bytes,2,opt,name=rev"
json:"rev,omitempty"`
}
+type FindInstance struct {
+ Instance *HeartbeatSetElement `protobuf:"bytes,1,opt,name=instance"
json:"instance"`
+ Rev string `protobuf:"bytes,2,opt,name=rev"
json:"rev,omitempty"`
+}
+
type FindResult struct {
Index int64 `protobuf:"varint,1,opt,name=index"
json:"index"`
Rev string `protobuf:"bytes,2,opt,name=rev"
json:"rev"`
@@ -35,14 +40,20 @@ type FindFailedResult struct {
Error *scerr.Error `protobuf:"bytes,2,opt,name=error" json:"error"`
}
+type BatchFindResult struct {
+ Failed []*FindFailedResult `protobuf:"bytes,1,rep,name=failed"
json:"failed,omitempty"`
+ NotModified []int64
`protobuf:"varint,2,rep,packed,name=notModified" json:"notModified,omitempty"`
+ Updated []*FindResult `protobuf:"bytes,3,rep,name=updated"
json:"updated,omitempty"`
+}
+
type BatchFindInstancesRequest struct {
- ConsumerServiceId string
`protobuf:"bytes,1,opt,name=consumerServiceId"
json:"consumerServiceId,omitempty"`
- Services []*FindService `protobuf:"bytes,2,rep,name=services"
json:"services"`
+ ConsumerServiceId string
`protobuf:"bytes,1,opt,name=consumerServiceId"
json:"consumerServiceId,omitempty"`
+ Services []*FindService `protobuf:"bytes,2,rep,name=services"
json:"services,omitempty"`
+ Instances []*FindInstance
`protobuf:"bytes,3,rep,name=instances" json:"instances,omitempty"`
}
type BatchFindInstancesResponse struct {
- Response *Response `protobuf:"bytes,1,opt,name=response"
json:"response,omitempty"`
- Failed []*FindFailedResult `protobuf:"bytes,2,rep,name=failed"
json:"failed,omitempty"`
- NotModified []int64
`protobuf:"varint,3,rep,packed,name=notModified" json:"notModified,omitempty"`
- Updated []*FindResult `protobuf:"bytes,4,rep,name=updated"
json:"updated,omitempty"`
+ Response *Response `protobuf:"bytes,1,opt,name=response"
json:"response,omitempty"`
+ Services *BatchFindResult `protobuf:"bytes,2,rep,name=services"
json:"services,omitempty"`
+ Instances *BatchFindResult `protobuf:"bytes,3,rep,name=instances"
json:"instances,omitempty"`
}
diff --git a/server/core/swagger/v4.yaml b/server/core/swagger/v4.yaml
index 6e2e45b..79947d1 100644
--- a/server/core/swagger/v4.yaml
+++ b/server/core/swagger/v4.yaml
@@ -1964,6 +1964,14 @@ definitions:
rev:
type: string
description: 客户端缓存的版本号。
+ FindInstance:
+ type: object
+ properties:
+ instance:
+ $ref: '#/definitions/HeartbeatSetElement'
+ rev:
+ type: string
+ description: 客户端缓存的版本号。
BatchFindRequest:
type: object
properties:
@@ -1971,6 +1979,10 @@ definitions:
type: array
items:
$ref: '#/definitions/FindService'
+ instances:
+ type: array
+ items:
+ $ref: '#/definitions/FindInstance'
FindResult:
type: object
properties:
@@ -1994,7 +2006,7 @@ definitions:
description: 与请求数组对应的索引集合。
error:
$ref: '#/definitions/Error'
- BatchFindResponse:
+ BatchFindResult:
type: object
properties:
failed:
@@ -2010,6 +2022,13 @@ definitions:
type: array
items:
$ref: '#/definitions/FindResult'
+ BatchFindResponse:
+ type: object
+ properties:
+ services:
+ $ref: '#/definitions/BatchFindResult'
+ instances:
+ $ref: '#/definitions/BatchFindResult'
CreateDependenciesRequest:
type: object
properties:
diff --git a/server/rest/controller/v4/instance_controller.go
b/server/rest/controller/v4/instance_controller.go
index 36efd16..5124a59 100644
--- a/server/rest/controller/v4/instance_controller.go
+++ b/server/rest/controller/v4/instance_controller.go
@@ -191,9 +191,18 @@ func (this *MicroServiceInstanceService) GetOneInstance(w
http.ResponseWriter, r
ProviderInstanceId: query.Get(":instanceId"),
Tags: ids,
}
+
resp, _ := core.InstanceAPI.GetOneInstance(r.Context(), request)
respInternal := resp.Response
resp.Response = nil
+
+ iv, _ := r.Context().Value(serviceUtil.CTX_REQUEST_REVISION).(string)
+ ov, _ := r.Context().Value(serviceUtil.CTX_RESPONSE_REVISION).(string)
+ w.Header().Set(serviceUtil.HEADER_REV, ov)
+ if len(iv) > 0 && iv == ov {
+ w.WriteHeader(http.StatusNotModified)
+ return
+ }
controller.WriteResponse(w, respInternal, resp)
}
@@ -212,6 +221,14 @@ func (this *MicroServiceInstanceService) GetInstances(w
http.ResponseWriter, r *
resp, _ := core.InstanceAPI.GetInstances(r.Context(), request)
respInternal := resp.Response
resp.Response = nil
+
+ iv, _ := r.Context().Value(serviceUtil.CTX_REQUEST_REVISION).(string)
+ ov, _ := r.Context().Value(serviceUtil.CTX_RESPONSE_REVISION).(string)
+ w.Header().Set(serviceUtil.HEADER_REV, ov)
+ if len(iv) > 0 && iv == ov {
+ w.WriteHeader(http.StatusNotModified)
+ return
+ }
controller.WriteResponse(w, respInternal, resp)
}
diff --git a/server/service/cache/common.go b/server/service/cache/common.go
index b25c92d..d153a9d 100644
--- a/server/service/cache/common.go
+++ b/server/service/cache/common.go
@@ -17,10 +17,11 @@
package cache
const (
- CTX_FIND_CONSUMER = "_consumer"
- CTX_FIND_PROVIDER = "_provider"
- CTX_FIND_TAGS = "_tags"
- CTX_FIND_REQUEST_REV = "_rev"
+ CTX_FIND_CONSUMER = "_consumer"
+ CTX_FIND_PROVIDER = "_provider"
+ CTX_FIND_PROVIDER_INSTANCE = "_provider_instance"
+ CTX_FIND_TAGS = "_tags"
+ CTX_FIND_REQUEST_REV = "_rev"
CACHE_FIND = "_find"
CACHE_DEP = "_dep"
diff --git a/server/service/cache/filter_instances.go
b/server/service/cache/filter_instances.go
index 0f26750..4c0b58b 100644
--- a/server/service/cache/filter_instances.go
+++ b/server/service/cache/filter_instances.go
@@ -46,12 +46,17 @@ type InstancesFilter struct {
}
func (f *InstancesFilter) Name(ctx context.Context, _ *cache.Node) string {
+ instanceKey, ok :=
ctx.Value(CTX_FIND_PROVIDER_INSTANCE).(*pb.HeartbeatSetElement)
+ if ok {
+ return instanceKey.ServiceId + apt.SPLIT +
instanceKey.InstanceId
+ }
return ""
}
func (f *InstancesFilter) Init(ctx context.Context, parent *cache.Node) (node
*cache.Node, err error) {
pCopy := *parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem)
- pCopy.Instances, pCopy.Rev, err = f.FindInstances(ctx, pCopy.ServiceIds)
+
+ pCopy.Instances, pCopy.Rev, err = f.Find(ctx, parent)
if err != nil {
return
}
@@ -62,34 +67,76 @@ func (f *InstancesFilter) Init(ctx context.Context, parent
*cache.Node) (node *c
return
}
-func (f *InstancesFilter) FindInstances(ctx context.Context, serviceIds
[]string) (instances []*pb.MicroServiceInstance, rev string, err error) {
+func (f *InstancesFilter) Find(ctx context.Context, parent *cache.Node) (
+ instances []*pb.MicroServiceInstance, rev string, err error) {
+ pCache := parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem)
provider := ctx.Value(CTX_FIND_PROVIDER).(*pb.MicroServiceKey)
+
+ instanceKey, ok :=
ctx.Value(CTX_FIND_PROVIDER_INSTANCE).(*pb.HeartbeatSetElement)
+ if ok {
+ if len(pCache.ServiceIds) == 0 {
+ // can not find by instanceKey.ServiceId after
pre-filters init
+ return
+ }
+ instances, rev, err = f.FindInstances(ctx, provider.Tenant,
instanceKey)
+ } else {
+ instances, rev, err = f.BatchFindInstances(ctx,
provider.Tenant, pCache.ServiceIds)
+ }
+ if err != nil {
+ consumer := ctx.Value(CTX_FIND_CONSUMER).(*pb.MicroService)
+ findFlag := fmt.Sprintf("consumer '%s' find provider %s/%s/%s",
consumer.ServiceId,
+ provider.AppId, provider.ServiceName, provider.Version)
+ log.Errorf(err, "Find failed, %s", findFlag)
+ }
+ return
+}
+
+func (f *InstancesFilter) findInstances(ctx context.Context, domainProject,
serviceId, instanceId string, maxRevs []int64, counts []int64) (instances
[]*pb.MicroServiceInstance, err error) {
+ key := apt.GenerateInstanceKey(domainProject, serviceId, instanceId)
+ opts := append(serviceUtil.FromContext(ctx), registry.WithStrKey(key),
registry.WithPrefix())
+ resp, err := backend.Store().Instance().Search(ctx, opts...)
+ if err != nil {
+ return nil, err
+ }
+ if len(resp.Kvs) == 0 {
+ return
+ }
+
+ for _, kv := range resp.Kvs {
+ if i, ok := clustersIndex[kv.ClusterName]; ok {
+ if kv.ModRevision > maxRevs[i] {
+ maxRevs[i] = kv.ModRevision
+ }
+ counts[i]++
+ }
+ instances = append(instances,
kv.Value.(*pb.MicroServiceInstance))
+ }
+ return
+}
+
+func (f *InstancesFilter) FindInstances(ctx context.Context, domainProject
string, instanceKey *pb.HeartbeatSetElement) (instances
[]*pb.MicroServiceInstance, rev string, err error) {
+ var (
+ maxRevs = make([]int64, len(clustersIndex))
+ counts = make([]int64, len(clustersIndex))
+ )
+ instances, err = f.findInstances(ctx, domainProject,
instanceKey.ServiceId, instanceKey.InstanceId, maxRevs, counts)
+ if err != nil {
+ return
+ }
+ return instances, serviceUtil.FormatRevision(maxRevs, counts), nil
+}
+
+func (f *InstancesFilter) BatchFindInstances(ctx context.Context,
domainProject string, serviceIds []string) (instances
[]*pb.MicroServiceInstance, rev string, err error) {
var (
maxRevs = make([]int64, len(clustersIndex))
counts = make([]int64, len(clustersIndex))
)
for _, providerServiceId := range serviceIds {
- key := apt.GenerateInstanceKey(provider.Tenant,
providerServiceId, "")
- opts := append(serviceUtil.FromContext(ctx),
registry.WithStrKey(key), registry.WithPrefix())
- resp, err := backend.Store().Instance().Search(ctx, opts...)
+ insts, err := f.findInstances(ctx, domainProject,
providerServiceId, "", maxRevs, counts)
if err != nil {
- consumer :=
ctx.Value(CTX_FIND_CONSUMER).(*pb.MicroService)
- findFlag := fmt.Sprintf("consumer '%s' find provider
%s/%s/%s", consumer.ServiceId,
- provider.AppId, provider.ServiceName,
provider.Version)
- log.Errorf(err, "Instance().Search failed, %s",
findFlag)
return nil, "", err
}
-
- for _, kv := range resp.Kvs {
- if i, ok := clustersIndex[kv.ClusterName]; ok {
- if kv.ModRevision > maxRevs[i] {
- maxRevs[i] = kv.ModRevision
- }
- counts[i]++
- }
- instances = append(instances,
kv.Value.(*pb.MicroServiceInstance))
- }
-
+ instances = append(instances, insts...)
}
return instances, serviceUtil.FormatRevision(maxRevs, counts), nil
diff --git a/server/service/cache/filter_rev.go
b/server/service/cache/filter_rev.go
index d3eb5a4..381b033 100644
--- a/server/service/cache/filter_rev.go
+++ b/server/service/cache/filter_rev.go
@@ -38,35 +38,34 @@ func (f *RevisionFilter) Name(ctx context.Context, parent
*cache.Node) string {
}
func (f *RevisionFilter) Init(ctx context.Context, parent *cache.Node) (node
*cache.Node, err error) {
- item := parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem)
+ pCache := parent.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem)
requestRev := ctx.Value(CTX_FIND_REQUEST_REV).(string)
- if len(requestRev) == 0 || requestRev == item.Rev {
+ if len(requestRev) == 0 || requestRev == pCache.Rev {
node = cache.NewNode()
- node.Cache.Set(CACHE_FIND, item)
+ node.Cache.Set(CACHE_FIND, pCache)
return
}
- if item.BrokenWait() {
+ if pCache.BrokenWait() {
node = cache.NewNode()
- node.Cache.Set(CACHE_FIND, item)
+ node.Cache.Set(CACHE_FIND, pCache)
return
}
cloneCtx := util.CloneContext(ctx)
cloneCtx = util.SetContext(cloneCtx, serviceUtil.CTX_NOCACHE, "1")
-
- insts, _, err := f.FindInstances(cloneCtx, item.ServiceIds)
+ insts, _, err := f.Find(cloneCtx, parent)
if err != nil {
- item.InitBrokenQueue()
+ pCache.InitBrokenQueue()
return nil, err
}
- log.Warnf("the cache of finding instances api is broken,
req[%s]!=cache[%s]",
- requestRev, item.Rev)
- item.Instances = insts
- item.Broken()
+ log.Warnf("the cache of finding instances api is broken,
req[%s]!=cache[%s][%s]",
+ requestRev, pCache.Rev, parent.Name)
+ pCache.Instances = insts
+ pCache.Broken()
node = cache.NewNode()
- node.Cache.Set(CACHE_FIND, item)
+ node.Cache.Set(CACHE_FIND, pCache)
return
}
diff --git a/server/service/cache/filter_version.go
b/server/service/cache/filter_version.go
index 0c47f6c..099d261 100644
--- a/server/service/cache/filter_version.go
+++ b/server/service/cache/filter_version.go
@@ -34,6 +34,15 @@ func (f *VersionRuleFilter) Name(ctx context.Context, _
*cache.Node) string {
}
func (f *VersionRuleFilter) Init(ctx context.Context, parent *cache.Node)
(node *cache.Node, err error) {
+ instance, ok :=
ctx.Value(CTX_FIND_PROVIDER_INSTANCE).(*pb.HeartbeatSetElement)
+ if ok {
+ node = cache.NewNode()
+ node.Cache.Set(CACHE_FIND, &VersionRuleCacheItem{
+ ServiceIds: []string{instance.ServiceId},
+ })
+ return
+ }
+
provider := ctx.Value(CTX_FIND_PROVIDER).(*pb.MicroServiceKey)
// 版本规则
ids, exist, err := serviceUtil.FindServiceIds(ctx, provider.Version,
provider)
@@ -41,7 +50,7 @@ func (f *VersionRuleFilter) Init(ctx context.Context, parent
*cache.Node) (node
consumer := ctx.Value(CTX_FIND_CONSUMER).(*pb.MicroService)
findFlag := fmt.Sprintf("consumer '%s' find provider %s/%s/%s",
consumer.ServiceId,
provider.AppId, provider.ServiceName, provider.Version)
- log.Errorf(err, "VersionRuleFilter failed, %s", findFlag)
+ log.Errorf(err, "FindServiceIds failed, %s", findFlag)
return
}
if !exist {
@@ -50,8 +59,7 @@ func (f *VersionRuleFilter) Init(ctx context.Context, parent
*cache.Node) (node
node = cache.NewNode()
node.Cache.Set(CACHE_FIND, &VersionRuleCacheItem{
- VersionRule: provider.Version,
- ServiceIds: ids,
+ ServiceIds: ids,
})
return
}
diff --git a/server/service/cache/instance.go b/server/service/cache/instance.go
index f7e7f30..b6ec609 100644
--- a/server/service/cache/instance.go
+++ b/server/service/cache/instance.go
@@ -42,10 +42,9 @@ func init() {
}
type VersionRuleCacheItem struct {
- VersionRule string
- ServiceIds []string
- Instances []*pb.MicroServiceInstance
- Rev string
+ ServiceIds []string
+ Instances []*pb.MicroServiceInstance
+ Rev string
broken bool
queue chan struct{}
@@ -88,6 +87,12 @@ func (f *FindInstancesCache) Get(ctx context.Context,
consumer *pb.MicroService,
return node.Cache.Get(CACHE_FIND).(*VersionRuleCacheItem), nil
}
+func (f *FindInstancesCache) GetWithProviderId(ctx context.Context, consumer
*pb.MicroService, provider *pb.MicroServiceKey,
+ instanceKey *pb.HeartbeatSetElement, tags []string, rev string)
(*VersionRuleCacheItem, error) {
+ cloneCtx := context.WithValue(ctx, CTX_FIND_PROVIDER_INSTANCE,
instanceKey)
+ return f.Get(cloneCtx, consumer, provider, tags, rev)
+}
+
func (f *FindInstancesCache) Remove(provider *pb.MicroServiceKey) {
f.Tree.Remove(context.WithValue(context.Background(),
CTX_FIND_PROVIDER, provider))
if len(provider.Alias) > 0 {
diff --git a/server/service/instance.go b/server/service/instance.go
index 53eca31..a630958 100644
--- a/server/service/instance.go
+++ b/server/service/instance.go
@@ -394,108 +394,172 @@ func getHeartbeatFunc(ctx context.Context,
domainProject string, instancesHbRst
}
func (s *InstanceService) GetOneInstance(ctx context.Context, in
*pb.GetOneInstanceRequest) (*pb.GetOneInstanceResponse, error) {
- if err := Validate(in); err != nil {
+ err := Validate(in)
+ if err != nil {
log.Errorf(err, "get instance failed: invalid parameters")
return &pb.GetOneInstanceResponse{
Response: pb.CreateResponse(scerr.ErrInvalidParams,
err.Error()),
}, nil
}
- cpFunc := func() string {
- return fmt.Sprintf("consumer[%s] get provider instance[%s/%s]",
- in.ConsumerServiceId, in.ProviderServiceId,
in.ProviderInstanceId)
- }
+ domainProject := util.ParseDomainProject(ctx)
- if checkErr := s.getInstancePreCheck(ctx, in.ProviderServiceId,
in.ConsumerServiceId, in.Tags); checkErr != nil {
- log.Errorf(checkErr, "%s failed: pre check failed", cpFunc())
- resp := &pb.GetOneInstanceResponse{
- Response: pb.CreateResponseWithSCErr(checkErr),
+ service := &pb.MicroService{}
+ if len(in.ConsumerServiceId) > 0 {
+ service, err = serviceUtil.GetService(ctx, domainProject,
in.ConsumerServiceId)
+ if err != nil {
+ log.Errorf(err, "get consumer failed, consumer[%s] find
provider instance[%s/%s]",
+ in.ConsumerServiceId, in.ProviderServiceId,
in.ProviderInstanceId)
+ return &pb.GetOneInstanceResponse{
+ Response: pb.CreateResponse(scerr.ErrInternal,
err.Error()),
+ }, err
}
- if checkErr.InternalError() {
- return resp, checkErr
+ if service == nil {
+ log.Errorf(nil, "consumer does not exist, consumer[%s]
find provider instance[%s/%s]",
+ in.ConsumerServiceId, in.ProviderServiceId,
in.ProviderInstanceId)
+ return &pb.GetOneInstanceResponse{
+ Response:
pb.CreateResponse(scerr.ErrServiceNotExists,
+ fmt.Sprintf("Consumer[%s] does not
exist.", in.ConsumerServiceId)),
+ }, nil
}
- return resp, nil
}
- serviceId := in.ProviderServiceId
- instanceId := in.ProviderInstanceId
- instance, err := serviceUtil.GetInstance(ctx,
util.ParseTargetDomainProject(ctx), serviceId, instanceId)
+ provider, err := serviceUtil.GetService(ctx, domainProject,
in.ProviderServiceId)
if err != nil {
- log.Errorf(err, "%s failed: get instance failed", cpFunc())
+ log.Errorf(err, "get provider failed, consumer[%s] find
provider instance[%s/%s]",
+ in.ConsumerServiceId, in.ProviderServiceId,
in.ProviderInstanceId)
return &pb.GetOneInstanceResponse{
Response: pb.CreateResponse(scerr.ErrInternal,
err.Error()),
}, err
}
- if instance == nil {
- log.Errorf(nil, "%s failed: instance does not exist", cpFunc())
+ if provider == nil {
+ log.Errorf(nil, "provider does not exist, consumer[%s] find
provider instance[%s/%s]",
+ in.ConsumerServiceId, in.ProviderServiceId,
in.ProviderInstanceId)
return &pb.GetOneInstanceResponse{
- Response: pb.CreateResponse(scerr.ErrInstanceNotExists,
"Service instance does not exist."),
+ Response: pb.CreateResponse(scerr.ErrServiceNotExists,
+ fmt.Sprintf("Provider[%s] does not exist.",
in.ProviderServiceId)),
}, nil
}
+ findFlag := func() string {
+ return fmt.Sprintf("Consumer[%s][%s/%s/%s/%s] find
provider[%s][%s/%s/%s/%s] instance[%s]",
+ in.ConsumerServiceId, service.Environment,
service.AppId, service.ServiceName, service.Version,
+ provider.ServiceId, provider.Environment,
provider.AppId, provider.ServiceName, provider.Version,
+ in.ProviderInstanceId)
+ }
+
+ var item *cache.VersionRuleCacheItem
+ rev, _ := ctx.Value(serviceUtil.CTX_REQUEST_REVISION).(string)
+ item, err = cache.FindInstances.GetWithProviderId(ctx, service,
pb.MicroServiceToKey(domainProject, provider),
+ &pb.HeartbeatSetElement{
+ ServiceId: in.ProviderServiceId, InstanceId:
in.ProviderInstanceId,
+ }, in.Tags, rev)
+ if err != nil {
+ log.Errorf(err, "FindInstances.GetWithProviderId failed, %s
failed", findFlag())
+ return &pb.GetOneInstanceResponse{
+ Response: pb.CreateResponse(scerr.ErrInternal,
err.Error()),
+ }, err
+ }
+ if item == nil || len(item.Instances) == 0 {
+ mes := fmt.Errorf("%s failed, provider instance does not
exist.", findFlag())
+ log.Errorf(mes, "FindInstances.GetWithProviderId failed")
+ return &pb.GetOneInstanceResponse{
+ Response: pb.CreateResponse(scerr.ErrInstanceNotExists,
mes.Error()),
+ }, nil
+ }
+
+ instance := item.Instances[0]
+ if rev == item.Rev {
+ instance = nil // for gRPC
+ }
+ ctx = util.SetContext(ctx, serviceUtil.CTX_RESPONSE_REVISION, item.Rev)
+
return &pb.GetOneInstanceResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "Get instance
successfully."),
Instance: instance,
}, nil
}
-func (s *InstanceService) getInstancePreCheck(ctx context.Context,
providerServiceId, consumerServiceId string, tags []string) *scerr.Error {
- targetDomainProject := util.ParseTargetDomainProject(ctx)
- if !serviceUtil.ServiceExist(ctx, targetDomainProject,
providerServiceId) {
- return scerr.NewError(scerr.ErrServiceNotExists, "Provider
serviceId is invalid")
+func (s *InstanceService) GetInstances(ctx context.Context, in
*pb.GetInstancesRequest) (*pb.GetInstancesResponse, error) {
+ err := Validate(in)
+ if err != nil {
+ log.Errorf(err, "get instances failed: invalid parameters")
+ return &pb.GetInstancesResponse{
+ Response: pb.CreateResponse(scerr.ErrInvalidParams,
err.Error()),
+ }, nil
}
- // Tag过滤
- if len(tags) > 0 {
- tagsFromETCD, err := serviceUtil.GetTagsUtils(ctx,
targetDomainProject, providerServiceId)
+ domainProject := util.ParseDomainProject(ctx)
+
+ service := &pb.MicroService{}
+ if len(in.ConsumerServiceId) > 0 {
+ service, err = serviceUtil.GetService(ctx, domainProject,
in.ConsumerServiceId)
if err != nil {
- return scerr.NewErrorf(scerr.ErrInternal, "An error
occurred in query provider tags(%s)", err.Error())
- }
- if len(tagsFromETCD) == 0 {
- return scerr.NewError(scerr.ErrTagNotExists, "Provider
has no tag")
+ log.Errorf(err, "get consumer failed, consumer[%s] find
provider instances",
+ in.ConsumerServiceId, in.ProviderServiceId)
+ return &pb.GetInstancesResponse{
+ Response: pb.CreateResponse(scerr.ErrInternal,
err.Error()),
+ }, err
}
- for _, tag := range tags {
- if _, ok := tagsFromETCD[tag]; !ok {
- return scerr.NewErrorf(scerr.ErrTagNotExists,
"Provider tags do not contain '%s'", tag)
- }
+ if service == nil {
+ log.Errorf(nil, "consumer does not exist, consumer[%s]
find provider instances",
+ in.ConsumerServiceId, in.ProviderServiceId)
+ return &pb.GetInstancesResponse{
+ Response:
pb.CreateResponse(scerr.ErrServiceNotExists,
+ fmt.Sprintf("Consumer[%s] does not
exist.", in.ConsumerServiceId)),
+ }, nil
}
}
- // 黑白名单
- // 跨应用调用
- return serviceUtil.Accessible(ctx, consumerServiceId, providerServiceId)
-}
-func (s *InstanceService) GetInstances(ctx context.Context, in
*pb.GetInstancesRequest) (*pb.GetInstancesResponse, error) {
- if err := Validate(in); err != nil {
- log.Errorf(err, "get instances failed: invalid parameters")
+ provider, err := serviceUtil.GetService(ctx, domainProject,
in.ProviderServiceId)
+ if err != nil {
+ log.Errorf(err, "get provider failed, consumer[%s] find
provider instances",
+ in.ConsumerServiceId, in.ProviderServiceId)
return &pb.GetInstancesResponse{
- Response: pb.CreateResponse(scerr.ErrInvalidParams,
err.Error()),
- }, nil
+ Response: pb.CreateResponse(scerr.ErrInternal,
err.Error()),
+ }, err
}
-
- cpFunc := func() string {
- return fmt.Sprintf("consumer[%s] get provider[%s] instances",
+ if provider == nil {
+ log.Errorf(nil, "provider does not exist, consumer[%s] find
provider instances",
in.ConsumerServiceId, in.ProviderServiceId)
+ return &pb.GetInstancesResponse{
+ Response: pb.CreateResponse(scerr.ErrServiceNotExists,
+ fmt.Sprintf("Provider[%s] does not exist.",
in.ProviderServiceId)),
+ }, nil
}
- if checkErr := s.getInstancePreCheck(ctx, in.ProviderServiceId,
in.ConsumerServiceId, in.Tags); checkErr != nil {
- log.Errorf(checkErr, "%s failed: pre check failed", cpFunc())
- resp := &pb.GetInstancesResponse{
- Response: pb.CreateResponseWithSCErr(checkErr),
- }
- if checkErr.InternalError() {
- return resp, checkErr
- }
- return resp, nil
+ findFlag := func() string {
+ return fmt.Sprintf("Consumer[%s][%s/%s/%s/%s] find
provider[%s][%s/%s/%s/%s] instances",
+ in.ConsumerServiceId, service.Environment,
service.AppId, service.ServiceName, service.Version,
+ provider.ServiceId, provider.Environment,
provider.AppId, provider.ServiceName, provider.Version)
}
- instances, err := serviceUtil.GetAllInstancesOfOneService(ctx,
util.ParseTargetDomainProject(ctx), in.ProviderServiceId)
+ var item *cache.VersionRuleCacheItem
+ rev, _ := ctx.Value(serviceUtil.CTX_REQUEST_REVISION).(string)
+ item, err = cache.FindInstances.GetWithProviderId(ctx, service,
pb.MicroServiceToKey(domainProject, provider),
+ &pb.HeartbeatSetElement{
+ ServiceId: in.ProviderServiceId,
+ }, in.Tags, rev)
if err != nil {
- log.Errorf(err, "%s failed", cpFunc())
+ log.Errorf(err, "FindInstances.GetWithProviderId failed, %s
failed", findFlag())
return &pb.GetInstancesResponse{
Response: pb.CreateResponse(scerr.ErrInternal,
err.Error()),
}, err
}
+ if item == nil || len(item.ServiceIds) == 0 {
+ mes := fmt.Errorf("%s failed, provider instance does not
exist.", findFlag())
+ log.Errorf(mes, "FindInstances.GetWithProviderId failed")
+ return &pb.GetInstancesResponse{
+ Response: pb.CreateResponse(scerr.ErrServiceNotExists,
mes.Error()),
+ }, nil
+ }
+
+ instances := item.Instances
+ if rev == item.Rev {
+ instances = nil // for gRPC
+ }
+ ctx = util.SetContext(ctx, serviceUtil.CTX_RESPONSE_REVISION, item.Rev)
+
return &pb.GetInstancesResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "Query
service instances successfully."),
Instances: instances,
@@ -617,6 +681,14 @@ func (s *InstanceService) Find(ctx context.Context, in
*pb.FindInstancesRequest)
}
func (s *InstanceService) BatchFind(ctx context.Context, in
*pb.BatchFindInstancesRequest) (*pb.BatchFindInstancesResponse, error) {
+ if len(in.Services) == 0 && len(in.Instances) == 0 {
+ err := errors.New("Required services or instances")
+ log.Errorf(err, "batch find instance failed: invalid
parameters")
+ return &pb.BatchFindInstancesResponse{
+ Response: pb.CreateResponse(scerr.ErrInvalidParams,
err.Error()),
+ }, nil
+ }
+
err := Validate(in)
if err != nil {
log.Errorf(err, "batch find instance failed: invalid
parameters")
@@ -628,6 +700,32 @@ func (s *InstanceService) BatchFind(ctx context.Context,
in *pb.BatchFindInstanc
response := &pb.BatchFindInstancesResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "Batch query
service instances successfully."),
}
+
+ // find services
+ response.Services, err = s.batchFindServices(ctx, in)
+ if err != nil {
+ return &pb.BatchFindInstancesResponse{
+ Response: pb.CreateResponse(scerr.ErrInternal,
err.Error()),
+ }, err
+ }
+
+ // find instance
+ response.Instances, err = s.batchFindInstances(ctx, in)
+ if err != nil {
+ return &pb.BatchFindInstancesResponse{
+ Response: pb.CreateResponse(scerr.ErrInternal,
err.Error()),
+ }, err
+ }
+
+ return response, nil
+}
+
+func (s *InstanceService) batchFindServices(ctx context.Context, in
*pb.BatchFindInstancesRequest) (*pb.BatchFindResult, error) {
+ if len(in.Services) == 0 {
+ return nil, nil
+ }
+
+ services := &pb.BatchFindResult{}
failedResult := make(map[int32]*pb.FindFailedResult)
for index, key := range in.Services {
cloneCtx := util.SetContext(ctx,
serviceUtil.CTX_REQUEST_REVISION, key.Rev)
@@ -639,21 +737,49 @@ func (s *InstanceService) BatchFind(ctx context.Context,
in *pb.BatchFindInstanc
Environment: key.Service.Environment,
})
if err != nil {
- return &pb.BatchFindInstancesResponse{
- Response: resp.Response,
- }, err
+ return nil, err
}
failed, ok := failedResult[resp.GetResponse().GetCode()]
- serviceUtil.AppendFindResponse(cloneCtx, int64(index), resp,
- &response.Updated, &response.NotModified, &failed)
+ serviceUtil.AppendFindResponse(cloneCtx, int64(index),
resp.GetResponse(), resp.GetInstances(),
+ &services.Updated, &services.NotModified, &failed)
if !ok && failed != nil {
failedResult[resp.GetResponse().GetCode()] = failed
}
}
for _, result := range failedResult {
- response.Failed = append(response.Failed, result)
+ services.Failed = append(services.Failed, result)
}
- return response, nil
+ return services, nil
+}
+
+func (s *InstanceService) batchFindInstances(ctx context.Context, in
*pb.BatchFindInstancesRequest) (*pb.BatchFindResult, error) {
+ if len(in.Instances) == 0 {
+ return nil, nil
+ }
+
+ instances := &pb.BatchFindResult{}
+ failedResult := make(map[int32]*pb.FindFailedResult)
+ for index, key := range in.Instances {
+ cloneCtx := util.SetContext(ctx,
serviceUtil.CTX_REQUEST_REVISION, key.Rev)
+ resp, err := s.GetOneInstance(cloneCtx,
&pb.GetOneInstanceRequest{
+ ConsumerServiceId: in.ConsumerServiceId,
+ ProviderServiceId: key.Instance.ServiceId,
+ ProviderInstanceId: key.Instance.InstanceId,
+ })
+ if err != nil {
+ return nil, err
+ }
+ failed, ok := failedResult[resp.GetResponse().GetCode()]
+ serviceUtil.AppendFindResponse(cloneCtx, int64(index),
resp.GetResponse(), []*pb.MicroServiceInstance{resp.GetInstance()},
+ &instances.Updated, &instances.NotModified, &failed)
+ if !ok && failed != nil {
+ failedResult[resp.GetResponse().GetCode()] = failed
+ }
+ }
+ for _, result := range failedResult {
+ instances.Failed = append(instances.Failed, result)
+ }
+ return instances, nil
}
func (s *InstanceService) reshapeProviderKey(ctx context.Context, provider
*pb.MicroServiceKey, providerId string) (*pb.MicroServiceKey, error) {
diff --git a/server/service/instance_test.go b/server/service/instance_test.go
index 25b654f..4359c36 100644
--- a/server/service/instance_test.go
+++ b/server/service/instance_test.go
@@ -829,11 +829,13 @@ var _ = Describe("'Instance' service", func() {
serviceId6 string
serviceId7 string
serviceId8 string
+ serviceId9 string
instanceId1 string
instanceId2 string
instanceId4 string
instanceId5 string
instanceId8 string
+ instanceId9 string
)
It("should be passed", func() {
@@ -948,6 +950,19 @@ var _ = Describe("'Instance' service", func() {
Expect(respCreate.Response.Code).To(Equal(pb.Response_SUCCESS))
serviceId8 = respCreate.ServiceId
+ respCreate, err = serviceResource.Create(getContext(),
&pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ AppId: "query_instance",
+ ServiceName:
"batch_query_instance_with_rev",
+ Version: "1.0.0",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respCreate.Response.Code).To(Equal(pb.Response_SUCCESS))
+ serviceId9 = respCreate.ServiceId
+
resp, err := instanceResource.Register(getContext(),
&pb.RegisterInstanceRequest{
Instance: &pb.MicroServiceInstance{
ServiceId: serviceId1,
@@ -1017,6 +1032,20 @@ var _ = Describe("'Instance' service", func() {
Expect(err).To(BeNil())
Expect(resp.Response.Code).To(Equal(pb.Response_SUCCESS))
instanceId8 = resp.InstanceId
+
+ resp, err = instanceResource.Register(getContext(),
&pb.RegisterInstanceRequest{
+ Instance: &pb.MicroServiceInstance{
+ ServiceId: serviceId9,
+ HostName: "UT-HOST",
+ Endpoints: []string{
+ "find:127.0.0.9:8080",
+ },
+ Status: pb.MSI_UP,
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(resp.Response.Code).To(Equal(pb.Response_SUCCESS))
+ instanceId9 = resp.InstanceId
})
Context("when query invalid parameters", func() {
@@ -1177,12 +1206,14 @@ var _ = Describe("'Instance' service", func() {
respFind, err :=
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
ConsumerServiceId: serviceId1,
Services: nil,
+ Instances: nil,
})
Expect(err).To(BeNil())
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
ConsumerServiceId: serviceId1,
Services: []*pb.FindService{},
+ Instances: []*pb.FindInstance{},
})
Expect(err).To(BeNil())
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
@@ -1192,6 +1223,12 @@ var _ = Describe("'Instance' service", func() {
})
Expect(err).To(BeNil())
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Instances:
[]*pb.FindInstance{{}},
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
By("invalid appId")
respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
@@ -1339,6 +1376,32 @@ var _ = Describe("'Instance' service", func() {
Expect(err).To(BeNil())
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
+ By("invalid instance")
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Instances: []*pb.FindInstance{
+ {
+ Instance:
&pb.HeartbeatSetElement{
+ ServiceId:
"query_instance",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Instances: []*pb.FindInstance{
+ {
+ Instance:
&pb.HeartbeatSetElement{
+ InstanceId:
"query_instance",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
+
By("consumerId is empty")
respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
ConsumerServiceId: serviceId1,
@@ -1370,8 +1433,23 @@ var _ = Describe("'Instance' service", func() {
})
Expect(err).To(BeNil())
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
-
Expect(respFind.Failed[0].Error.Code).To(Equal(scerr.ErrServiceNotExists))
-
Expect(respFind.Failed[0].Indexes[0]).To(Equal(int64(0)))
+
Expect(respFind.Services.Failed[0].Error.Code).To(Equal(scerr.ErrServiceNotExists))
+
Expect(respFind.Services.Failed[0].Indexes[0]).To(Equal(int64(0)))
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Instances: []*pb.FindInstance{
+ {
+ Instance:
&pb.HeartbeatSetElement{
+ ServiceId:
serviceId1,
+ InstanceId:
"noninstance",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
+
Expect(respFind.Instances.Failed[0].Error.Code).To(Equal(scerr.ErrInstanceNotExists))
+
Expect(respFind.Instances.Failed[0].Indexes[0]).To(Equal(int64(0)))
By("provider does not contain 3.0.0+ versions")
respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
@@ -1388,9 +1466,9 @@ var _ = Describe("'Instance' service", func() {
})
Expect(err).To(BeNil())
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
-
Expect(len(respFind.Updated[0].Instances)).To(Equal(0))
-
Expect(respFind.Updated[0].Index).To(Equal(int64(0)))
- Expect(respFind.Updated[0].Rev).ToNot(Equal(""))
+
Expect(len(respFind.Services.Updated[0].Instances)).To(Equal(0))
+
Expect(respFind.Services.Updated[0].Index).To(Equal(int64(0)))
+
Expect(respFind.Services.Updated[0].Rev).ToNot(Equal(""))
By("consumer does not exist")
respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
@@ -1407,8 +1485,8 @@ var _ = Describe("'Instance' service", func() {
})
Expect(err).To(BeNil())
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
-
Expect(respFind.Failed[0].Indexes[0]).To(Equal(int64(0)))
-
Expect(respFind.Failed[0].Error.Code).To(Equal(scerr.ErrServiceNotExists))
+
Expect(respFind.Services.Failed[0].Indexes[0]).To(Equal(int64(0)))
+
Expect(respFind.Services.Failed[0].Error.Code).To(Equal(scerr.ErrServiceNotExists))
})
})
@@ -1626,12 +1704,12 @@ var _ = Describe("'Instance' service", func() {
})
Expect(err).To(BeNil())
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
-
Expect(respFind.Updated[0].Index).To(Equal(int64(0)))
-
Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId2))
-
Expect(respFind.Updated[1].Index).To(Equal(int64(1)))
-
Expect(respFind.Updated[1].Instances[0].InstanceId).To(Equal(instanceId2))
-
Expect(respFind.Failed[0].Indexes[0]).To(Equal(int64(2)))
-
Expect(respFind.Failed[0].Error.Code).To(Equal(scerr.ErrServiceNotExists))
+
Expect(respFind.Services.Updated[0].Index).To(Equal(int64(0)))
+
Expect(respFind.Services.Updated[0].Instances[0].InstanceId).To(Equal(instanceId2))
+
Expect(respFind.Services.Updated[1].Index).To(Equal(int64(1)))
+
Expect(respFind.Services.Updated[1].Instances[0].InstanceId).To(Equal(instanceId2))
+
Expect(respFind.Services.Failed[0].Indexes[0]).To(Equal(int64(2)))
+
Expect(respFind.Services.Failed[0].Error.Code).To(Equal(scerr.ErrServiceNotExists))
By("find with env")
respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
@@ -1648,8 +1726,8 @@ var _ = Describe("'Instance' service", func() {
})
Expect(err).To(BeNil())
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
-
Expect(len(respFind.Updated[0].Instances)).To(Equal(1))
-
Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId4))
+
Expect(len(respFind.Services.Updated[0].Instances)).To(Equal(1))
+
Expect(respFind.Services.Updated[0].Instances[0].InstanceId).To(Equal(instanceId4))
respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
Services: []*pb.FindService{
@@ -1665,8 +1743,8 @@ var _ = Describe("'Instance' service", func() {
})
Expect(err).To(BeNil())
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
-
Expect(len(respFind.Updated[0].Instances)).To(Equal(1))
-
Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId4))
+
Expect(len(respFind.Services.Updated[0].Instances)).To(Equal(1))
+
Expect(respFind.Services.Updated[0].Instances[0].InstanceId).To(Equal(instanceId4))
By("find with rev")
ctx := util.SetContext(getContext(),
serviceUtil.CTX_NOCACHE, "")
@@ -1680,13 +1758,43 @@ var _ = Describe("'Instance' service", func() {
Version:
"1.0.0",
},
},
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName:
"batch_query_instance_with_rev",
+ Version:
"1.0.0",
+ },
+ },
+ },
+ Instances: []*pb.FindInstance{
+ {
+ Instance:
&pb.HeartbeatSetElement{
+ ServiceId:
serviceId9,
+ InstanceId:
instanceId9,
+ },
+ },
+ {
+ Instance:
&pb.HeartbeatSetElement{
+ ServiceId:
serviceId8,
+ InstanceId:
instanceId8,
+ },
+ },
},
})
Expect(err).To(BeNil())
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
- rev := respFind.Updated[0].Rev
-
Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId8))
+ rev := respFind.Services.Updated[0].Rev
+
Expect(respFind.Services.Updated[0].Index).To(Equal(int64(0)))
+
Expect(respFind.Services.Updated[1].Index).To(Equal(int64(1)))
+
Expect(respFind.Services.Updated[0].Instances[0].InstanceId).To(Equal(instanceId8))
+
Expect(respFind.Services.Updated[1].Instances[0].InstanceId).To(Equal(instanceId9))
Expect(len(rev)).NotTo(Equal(0))
+ instanceRev := respFind.Instances.Updated[0].Rev
+
Expect(respFind.Instances.Updated[0].Index).To(Equal(int64(0)))
+
Expect(respFind.Instances.Updated[1].Index).To(Equal(int64(1)))
+
Expect(respFind.Instances.Updated[0].Instances[0].InstanceId).To(Equal(instanceId9))
+
Expect(respFind.Instances.Updated[1].Instances[0].InstanceId).To(Equal(instanceId8))
+ Expect(len(instanceRev)).NotTo(Equal(0))
respFind, err = instanceResource.BatchFind(ctx,
&pb.BatchFindInstancesRequest{
ConsumerServiceId: serviceId8,
@@ -1700,11 +1808,22 @@ var _ = Describe("'Instance' service", func() {
Rev: "x",
},
},
+ Instances: []*pb.FindInstance{
+ {
+ Instance:
&pb.HeartbeatSetElement{
+ ServiceId:
serviceId9,
+ InstanceId:
instanceId9,
+ },
+ Rev: "x",
+ },
+ },
})
Expect(err).To(BeNil())
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
-
Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId8))
- Expect(respFind.Updated[0].Rev).To(Equal(rev))
+
Expect(respFind.Services.Updated[0].Instances[0].InstanceId).To(Equal(instanceId8))
+
Expect(respFind.Services.Updated[0].Rev).To(Equal(rev))
+
Expect(respFind.Instances.Updated[0].Instances[0].InstanceId).To(Equal(instanceId9))
+
Expect(respFind.Instances.Updated[0].Rev).To(Equal(instanceRev))
respFind, err = instanceResource.BatchFind(ctx,
&pb.BatchFindInstancesRequest{
ConsumerServiceId: serviceId8,
@@ -1718,10 +1837,20 @@ var _ = Describe("'Instance' service", func() {
Rev: rev,
},
},
+ Instances: []*pb.FindInstance{
+ {
+ Instance:
&pb.HeartbeatSetElement{
+ ServiceId:
serviceId9,
+ InstanceId:
instanceId9,
+ },
+ Rev: instanceRev,
+ },
+ },
})
Expect(err).To(BeNil())
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
-
Expect(respFind.NotModified[0]).To(Equal(int64(0)))
+
Expect(respFind.Services.NotModified[0]).To(Equal(int64(0)))
+
Expect(respFind.Instances.NotModified[0]).To(Equal(int64(0)))
By("find should return 200 even if consumer is
diff apps")
respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
@@ -1738,7 +1867,7 @@ var _ = Describe("'Instance' service", func() {
})
Expect(err).To(BeNil())
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
-
Expect(len(respFind.Updated[0].Instances)).To(Equal(0))
+
Expect(len(respFind.Services.Updated[0].Instances)).To(Equal(0))
By("shared service discovery")
os.Setenv("CSE_SHARED_SERVICES",
"query_instance_shared_provider")
@@ -1763,8 +1892,8 @@ var _ = Describe("'Instance' service", func() {
})
Expect(err).To(BeNil())
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
-
Expect(len(respFind.Updated[0].Instances)).To(Equal(1))
-
Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId5))
+
Expect(len(respFind.Services.Updated[0].Instances)).To(Equal(1))
+
Expect(respFind.Services.Updated[0].Instances[0].InstanceId).To(Equal(instanceId5))
respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
ConsumerServiceId: serviceId7,
@@ -1780,8 +1909,8 @@ var _ = Describe("'Instance' service", func() {
})
Expect(err).To(BeNil())
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
-
Expect(len(respFind.Updated[0].Instances)).To(Equal(1))
-
Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId5))
+
Expect(len(respFind.Services.Updated[0].Instances)).To(Equal(1))
+
Expect(respFind.Services.Updated[0].Instances[0].InstanceId).To(Equal(instanceId5))
core.Service.Environment = pb.ENV_DEV
})
@@ -1799,7 +1928,7 @@ var _ = Describe("'Instance' service", func() {
Expect(respFind.Response.Code).To(Equal(code))
}
- UTFunc(serviceId3, scerr.ErrPermissionDeny)
+ UTFunc(serviceId3, scerr.ErrServiceNotExists)
UTFunc(serviceId1, pb.Response_SUCCESS)
@@ -1934,7 +2063,7 @@ var _ = Describe("'Instance' service", func() {
Tags:
[]string{"not-exist-tag"},
})
Expect(err).To(BeNil())
-
Expect(resp.Response.Code).ToNot(Equal(pb.Response_SUCCESS))
+
Expect(resp.Response.Code).To(Equal(scerr.ErrInstanceNotExists))
By("provider tag exist")
resp, err =
instanceResource.GetOneInstance(getContext(),
@@ -1957,7 +2086,7 @@ var _ = Describe("'Instance' service", func() {
ProviderInstanceId: instanceId2,
})
Expect(err).To(BeNil())
-
Expect(resp.Response.Code).ToNot(Equal(pb.Response_SUCCESS))
+
Expect(resp.Response.Code).To(Equal(scerr.ErrInstanceNotExists))
respAll, err :=
instanceResource.GetInstances(getContext(), &pb.GetInstancesRequest{
ConsumerServiceId: serviceId3,
diff --git a/server/service/instance_validator.go
b/server/service/instance_validator.go
index bebb2c3..dde95f9 100644
--- a/server/service/instance_validator.go
+++ b/server/service/instance_validator.go
@@ -62,9 +62,12 @@ func BatchFindInstanceReqValidator() *validate.Validator {
var findServiceValidator validate.Validator
findServiceValidator.AddRule("Service",
&validate.ValidateRule{Min: 1})
findServiceValidator.AddSub("Service", ExistenceReqValidator())
+ var findInstanceValidator validate.Validator
+ findInstanceValidator.AddRule("Instance",
&validate.ValidateRule{Min: 1})
+ findInstanceValidator.AddSub("Instance",
HeartbeatReqValidator())
v.AddRule("ConsumerServiceId",
GetInstanceReqValidator().GetRule("ConsumerServiceId"))
- v.AddRule("Services", &validate.ValidateRule{Min: 1})
v.AddSub("Services", &findServiceValidator)
+ v.AddSub("Instances", &findInstanceValidator)
})
}
diff --git a/server/service/rule_test.go b/server/service/rule_test.go
index 0603d4c..2a6f995 100644
--- a/server/service/rule_test.go
+++ b/server/service/rule_test.go
@@ -681,7 +681,7 @@ var _ = Describe("'Rule' service", func() {
ProviderServiceId: providerBlack,
})
Expect(err).To(BeNil())
-
Expect(resp.Response.Code).To(Equal(scerr.ErrPermissionDeny))
+
Expect(resp.Response.Code).To(Equal(scerr.ErrServiceNotExists))
By("consumer tag in black list")
resp, err =
instanceResource.GetInstances(getContext(), &pb.GetInstancesRequest{
@@ -689,7 +689,7 @@ var _ = Describe("'Rule' service", func() {
ProviderServiceId: providerBlack,
})
Expect(err).To(BeNil())
-
Expect(resp.Response.Code).To(Equal(scerr.ErrPermissionDeny))
+
Expect(resp.Response.Code).To(Equal(scerr.ErrServiceNotExists))
By("find should return 200 even if consumer
permission deny")
respFind, err :=
instanceResource.Find(getContext(), &pb.FindInstancesRequest{
@@ -725,7 +725,7 @@ var _ = Describe("'Rule' service", func() {
ProviderServiceId: providerWhite,
})
Expect(err).To(BeNil())
-
Expect(resp.Response.Code).To(Equal(scerr.ErrPermissionDeny))
+
Expect(resp.Response.Code).To(Equal(scerr.ErrServiceNotExists))
By("consumer version in white list")
resp, err =
instanceResource.GetInstances(getContext(), &pb.GetInstancesRequest{
diff --git a/server/service/util/instance_util.go
b/server/service/util/instance_util.go
index 1e0e791..fea3692 100644
--- a/server/service/util/instance_util.go
+++ b/server/service/util/instance_util.go
@@ -273,12 +273,12 @@ func UpdateInstance(ctx context.Context, domainProject
string, instance *pb.Micr
return nil
}
-func AppendFindResponse(ctx context.Context, index int64, find
*pb.FindInstancesResponse,
+func AppendFindResponse(ctx context.Context, index int64, resp *pb.Response,
instances []*pb.MicroServiceInstance,
updatedResult *[]*pb.FindResult, notModifiedResult *[]int64,
failedResult **pb.FindFailedResult) {
- if code := find.GetResponse().GetCode(); code != pb.Response_SUCCESS {
+ if code := resp.GetCode(); code != pb.Response_SUCCESS {
if *failedResult == nil {
*failedResult = &pb.FindFailedResult{
- Error: scerr.NewError(code,
find.GetResponse().GetMessage()),
+ Error: scerr.NewError(code, resp.GetMessage()),
}
}
(*failedResult).Indexes = append((*failedResult).Indexes, index)
@@ -292,7 +292,7 @@ func AppendFindResponse(ctx context.Context, index int64,
find *pb.FindInstances
}
*updatedResult = append(*updatedResult, &pb.FindResult{
Index: index,
- Instances: find.Instances,
+ Instances: instances,
Rev: ov,
})
}
diff --git a/server/service/util/instance_util_test.go
b/server/service/util/instance_util_test.go
index 50087f2..9e9f72d 100644
--- a/server/service/util/instance_util_test.go
+++ b/server/service/util/instance_util_test.go
@@ -125,7 +125,7 @@ func TestAppendFindResponse(t *testing.T) {
notModifiedResult []int64
failedResult *pb.FindFailedResult
)
- AppendFindResponse(ctx, 1, &find, &updatedResult, ¬ModifiedResult,
&failedResult)
+ AppendFindResponse(ctx, 1, find.GetResponse(), find.Instances,
&updatedResult, ¬ModifiedResult, &failedResult)
if updatedResult == nil || notModifiedResult != nil || failedResult !=
nil {
t.Fatal("TestAppendFindResponse failed")
}
@@ -135,7 +135,7 @@ func TestAppendFindResponse(t *testing.T) {
updatedResult = nil
cloneCtx := context.WithValue(ctx, CTX_RESPONSE_REVISION, "1")
- AppendFindResponse(cloneCtx, 1, &find, &updatedResult,
¬ModifiedResult, &failedResult)
+ AppendFindResponse(cloneCtx, 1, find.GetResponse(), find.Instances,
&updatedResult, ¬ModifiedResult, &failedResult)
if updatedResult == nil || notModifiedResult != nil || failedResult !=
nil {
t.Fatal("TestAppendFindResponse failed")
}
@@ -146,7 +146,7 @@ func TestAppendFindResponse(t *testing.T) {
updatedResult = nil
cloneCtx = context.WithValue(ctx, CTX_REQUEST_REVISION, "1")
cloneCtx = context.WithValue(cloneCtx, CTX_RESPONSE_REVISION, "1")
- AppendFindResponse(cloneCtx, 1, &find, &updatedResult,
¬ModifiedResult, &failedResult)
+ AppendFindResponse(cloneCtx, 1, find.GetResponse(), find.Instances,
&updatedResult, ¬ModifiedResult, &failedResult)
if updatedResult != nil || notModifiedResult == nil || failedResult !=
nil {
t.Fatal("TestAppendFindResponse failed")
}
@@ -156,7 +156,7 @@ func TestAppendFindResponse(t *testing.T) {
notModifiedResult = nil
find.Response = pb.CreateResponse(scerr.ErrInternal, "test")
- AppendFindResponse(ctx, 1, &find, &updatedResult, ¬ModifiedResult,
&failedResult)
+ AppendFindResponse(ctx, 1, find.GetResponse(), find.Instances,
&updatedResult, ¬ModifiedResult, &failedResult)
if updatedResult != nil || notModifiedResult != nil || failedResult ==
nil {
t.Fatal("TestAppendFindResponse failed")
}
@@ -164,7 +164,7 @@ func TestAppendFindResponse(t *testing.T) {
t.Fatal("TestAppendFindResponse failed")
}
find.Response = pb.CreateResponse(scerr.ErrInvalidParams, "test")
- AppendFindResponse(ctx, 2, &find, &updatedResult, ¬ModifiedResult,
&failedResult)
+ AppendFindResponse(ctx, 2, find.GetResponse(), find.Instances,
&updatedResult, ¬ModifiedResult, &failedResult)
if updatedResult != nil || notModifiedResult != nil || failedResult ==
nil {
t.Fatal("TestAppendFindResponse failed")
}
@@ -174,15 +174,15 @@ func TestAppendFindResponse(t *testing.T) {
failedResult = nil
find.Response = nil
- AppendFindResponse(ctx, 1, &find, &updatedResult, ¬ModifiedResult,
&failedResult)
- AppendFindResponse(ctx, 2, &find, &updatedResult, ¬ModifiedResult,
&failedResult)
+ AppendFindResponse(ctx, 1, find.GetResponse(), find.Instances,
&updatedResult, ¬ModifiedResult, &failedResult)
+ AppendFindResponse(ctx, 2, find.GetResponse(), find.Instances,
&updatedResult, ¬ModifiedResult, &failedResult)
cloneCtx = context.WithValue(ctx, CTX_REQUEST_REVISION, "1")
cloneCtx = context.WithValue(cloneCtx, CTX_RESPONSE_REVISION, "1")
- AppendFindResponse(cloneCtx, 3, &find, &updatedResult,
¬ModifiedResult, &failedResult)
- AppendFindResponse(cloneCtx, 4, &find, &updatedResult,
¬ModifiedResult, &failedResult)
+ AppendFindResponse(cloneCtx, 3, find.GetResponse(), find.Instances,
&updatedResult, ¬ModifiedResult, &failedResult)
+ AppendFindResponse(cloneCtx, 4, find.GetResponse(), find.Instances,
&updatedResult, ¬ModifiedResult, &failedResult)
find.Response = pb.CreateResponse(scerr.ErrInternal, "test")
- AppendFindResponse(ctx, 5, &find, &updatedResult, ¬ModifiedResult,
&failedResult)
- AppendFindResponse(ctx, 6, &find, &updatedResult, ¬ModifiedResult,
&failedResult)
+ AppendFindResponse(ctx, 5, find.GetResponse(), find.Instances,
&updatedResult, ¬ModifiedResult, &failedResult)
+ AppendFindResponse(ctx, 6, find.GetResponse(), find.Instances,
&updatedResult, ¬ModifiedResult, &failedResult)
if updatedResult == nil || notModifiedResult == nil || failedResult ==
nil {
t.Fatal("TestAppendFindResponse failed")
}