This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 0e8562e SCB-1053 Batch microservices instances discovery API (#502)
0e8562e is described below
commit 0e8562ed709191f1543a9b232ad5dc73013632a9
Author: little-cui <[email protected]>
AuthorDate: Mon Dec 3 09:04:51 2018 +0800
SCB-1053 Batch microservices instances discovery API (#502)
* SCB-1053 Batch microservices instances discovery API
* SCB-1053 Update swagger
* SCB-1053 Implement the batch query API
* SCB-1053 Add UTs
---
integration/instances_test.go | 46 +++
server/core/proto/batch_find.go | 48 +++
server/core/proto/services.go | 3 +
server/core/proto/types.go | 6 +-
server/core/swagger/v4.yaml | 132 ++++++-
server/plugin/pkg/discovery/aggregate/repo.go | 8 +
server/plugin/pkg/discovery/etcd/repo.go | 7 +
server/plugin/pkg/discovery/servicecenter/repo.go | 7 +
server/plugin/pkg/registry/config.go | 17 +-
server/rest/controller/v4/instance_controller.go | 24 ++
server/service/instance.go | 40 ++
server/service/instance_test.go | 437 +++++++++++++++++++++-
server/service/instance_validator.go | 12 +
server/service/util/instance_util.go | 24 ++
server/service/util/instance_util_test.go | 75 ++++
server/service/validate.go | 2 +
16 files changed, 861 insertions(+), 27 deletions(-)
diff --git a/integration/instances_test.go b/integration/instances_test.go
index eb259bd..86976a5 100644
--- a/integration/instances_test.go
+++ b/integration/instances_test.go
@@ -348,6 +348,52 @@ var _ = Describe("MicroService Api Test", func() {
rev = resp.Header.Get("X-Resource-Revision")
Expect(rev).NotTo(BeEmpty())
})
+
+ It("Batch Find Micro-service Instance", func() {
+ notExistsService := map[string]interface{}{
+ "service": map[string]interface{}{
+ "appId": serviceAppId,
+ "serviceName": "notexisted",
+ "version": serviceVersion,
+ },
+ }
+ provider := map[string]interface{}{
+ "service": map[string]interface{}{
+ "appId": serviceAppId,
+ "serviceName": serviceName,
+ "version": serviceVersion,
+ },
+ }
+ findRequest := map[string]interface{}{
+ "services": []map[string]interface{}{
+ provider,
+ notExistsService,
+ },
+ }
+ body, _ := json.Marshal(findRequest)
+ bodyBuf := bytes.NewReader(body)
+ req, _ := http.NewRequest(POST,
SCURL+FINDINSTANCE, bodyBuf)
+ req.Header.Set("X-Domain-Name", "default")
+ req.Header.Set("X-ConsumerId", serviceId)
+ resp, _ := scclient.Do(req)
+ respbody, _ := ioutil.ReadAll(resp.Body)
+ Expect(resp.StatusCode).To(Equal(http.StatusOK))
+ servicesStruct :=
map[string][]map[string]interface{}{}
+ json.Unmarshal(respbody, &servicesStruct)
+ failed := false
+ for _, services := range
servicesStruct["failed"] {
+ a :=
services["indexes"].([]interface{})[0] == 1.0
+ b :=
services["error"].(map[string]interface{})["errorCode"] == "400012"
+ if a && b {
+ failed = true
+ break
+ }
+ }
+ Expect(failed).To(Equal(true))
+
Expect(servicesStruct["updated"][0]["index"]).To(Equal(0.0))
+
Expect(len(servicesStruct["updated"][0]["instances"].([]interface{}))).
+ ToNot(Equal(0))
+ })
})
By("Update Micro-Service Instance Information API's", func() {
diff --git a/server/core/proto/batch_find.go b/server/core/proto/batch_find.go
new file mode 100644
index 0000000..68e3883
--- /dev/null
+++ b/server/core/proto/batch_find.go
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package proto
+
+import (
+ scerr "github.com/apache/servicecomb-service-center/server/error"
+)
+
+type FindService struct {
+ Service *MicroServiceKey `protobuf:"bytes,1,opt,name=service"
json:"service"`
+ Rev string `protobuf:"bytes,2,opt,name=rev"
json:"rev,omitempty"`
+}
+
+type FindResult struct {
+ Index int64 `protobuf:"varint,1,opt,name=index"
json:"index"`
+ Rev string `protobuf:"bytes,2,opt,name=rev"
json:"rev"`
+ Instances []*MicroServiceInstance
`protobuf:"bytes,3,rep,name=instances" json:"instances"`
+}
+
+type FindFailedResult struct {
+ Indexes []int64 `protobuf:"varint,1,rep,packed,name=indexes"
json:"indexes"`
+ Error *scerr.Error `protobuf:"bytes,2,opt,name=error" json:"error"`
+}
+
+type BatchFindInstancesRequest struct {
+ ConsumerServiceId string
`protobuf:"bytes,1,opt,name=consumerServiceId"
json:"consumerServiceId,omitempty"`
+ Services []*FindService `protobuf:"bytes,2,rep,name=services"
json:"services"`
+}
+
+type BatchFindInstancesResponse struct {
+ Response *Response `protobuf:"bytes,1,opt,name=response"
json:"response,omitempty"`
+ Failed []*FindFailedResult `protobuf:"bytes,2,rep,name=failed"
json:"failed,omitempty"`
+ NotModified []int64
`protobuf:"varint,3,rep,packed,name=notModified" json:"notModified,omitempty"`
+ Updated []*FindResult `protobuf:"bytes,4,rep,name=updated"
json:"updated,omitempty"`
+}
diff --git a/server/core/proto/services.go b/server/core/proto/services.go
index 80b158a..a2e6509 100644
--- a/server/core/proto/services.go
+++ b/server/core/proto/services.go
@@ -24,7 +24,10 @@ import (
type ServiceInstanceCtrlServerEx interface {
ServiceInstanceCtrlServer
+ BatchFind(ctx context.Context, in *BatchFindInstancesRequest)
(*BatchFindInstancesResponse, error)
+
WebSocketWatch(ctx context.Context, in *WatchInstanceRequest, conn
*websocket.Conn)
WebSocketListAndWatch(ctx context.Context, in *WatchInstanceRequest,
conn *websocket.Conn)
+
ClusterHealth(ctx context.Context) (*GetInstancesResponse, error)
}
diff --git a/server/core/proto/types.go b/server/core/proto/types.go
index d3624bc..36de9a4 100644
--- a/server/core/proto/types.go
+++ b/server/core/proto/types.go
@@ -47,8 +47,8 @@ type ServerConfig struct {
CompactIndexDelta int64 `json:"-"`
CompactInterval string `json:"-"`
- EnablePProf bool `json:"-"`
- EnableCache bool `json:"-"`
+ EnablePProf bool `json:"enablePProf"`
+ EnableCache bool `json:"enableCache"`
LogRotateSize int64 `json:"-"`
LogBackupCount int64 `json:"-"`
@@ -60,7 +60,7 @@ type ServerConfig struct {
PluginsDir string `json:"-"`
Plugins util.JSONObject `json:"plugins"`
- SelfRegister bool `json:"-"`
+ SelfRegister bool `json:"selfRegister"`
}
type ServerInformation struct {
diff --git a/server/core/swagger/v4.yaml b/server/core/swagger/v4.yaml
index 75ff962..7d3aaef 100644
--- a/server/core/swagger/v4.yaml
+++ b/server/core/swagger/v4.yaml
@@ -1370,13 +1370,16 @@ paths:
in: header
type: string
default: default
- - name: project
- in: path
- required: true
+ - name: x-resource-revsion
+ in: header
type: string
+ description:
客户端缓存的版本号,由上一次请求该API返回Header中获得;如请求版本号不为空且与服务端不匹配则服务端返回其最新的实例集合和版本号;如匹配则服务端返回304状态且Body为空。
- name: X-ConsumerId
in: header
description: 微服务消费者的微服务唯一标识。
+ type: string
+ - name: project
+ in: path
required: true
type: string
- name: appId
@@ -1407,6 +1410,10 @@ paths:
responses:
200:
description: 查询成功
+ headers:
+ "X-Resource-Revision":
+ type: "string"
+ description: 返回集合的版本号,当集合内容发生变化,版本号随之变化
schema:
$ref: '#/definitions/GetInstancesResponse'
400:
@@ -1417,6 +1424,44 @@ paths:
description: 内部错误
schema:
type: string
+ post:
+ description: |
+ 批量微服务实例发现接口
+ operationId: batchFind
+ parameters:
+ - name: x-domain-name
+ in: header
+ type: string
+ default: default
+ - name: X-ConsumerId
+ in: header
+ description: 微服务消费者的微服务唯一标识。
+ type: string
+ - name: project
+ in: path
+ required: true
+ type: string
+ - name: services
+ in: body
+ description: 查询微服务的请求结构体
+ required: true
+ schema:
+ $ref: '#/definitions/BatchFindRequest'
+ tags:
+ - instances
+ responses:
+ 200:
+ description: 查询成功
+ schema:
+ $ref: '#/definitions/BatchFindResponse'
+ 400:
+ description: 错误的请求
+ schema:
+ type: string
+ 500:
+ description: 内部错误
+ schema:
+ type: string
/v4/{project}/registry/microservices/{serviceId}/watcher:
get:
description: |
@@ -1692,6 +1737,8 @@ definitions:
type: string
arch:
type: string
+ config:
+ $ref: '#/definitions/Config'
Properties:
type: object
description: 扩展属性
@@ -1907,6 +1954,60 @@ definitions:
modTimestamp:
type: string
description: 更新时间
+ FindService:
+ type: object
+ properties:
+ service:
+ $ref: '#definitions/DependencyKey'
+ rev:
+ type: string
+ description: 客户端缓存的版本号。
+ BatchFindRequest:
+ type: object
+ properties:
+ services:
+ type: array
+ items:
+ $ref: '#/definitions/FindService'
+ FindResult:
+ type: object
+ properties:
+ index:
+ type: integer
+ description: 与请求数组对应的索引。
+ rev:
+ type: string
+ description: 服务端返回集合版本,如跟客户端缓存版本号一致,则instances为空。
+ instances:
+ type: array
+ items:
+ $ref: '#/definitions/MicroServiceInstance'
+ FindFailedResult:
+ type: object
+ properties:
+ indexes:
+ type: array
+ items:
+ type: integer
+ description: 与请求数组对应的索引集合。
+ error:
+ $ref: '#/definitions/Error'
+ BatchFindResponse:
+ type: object
+ properties:
+ failed:
+ type: array
+ items:
+ $ref: '#/definitions/FindFailedResult'
+ notModified:
+ type: array
+ items:
+ type: integer
+ description: 与请求数组对应的索引集合。
+ updated:
+ type: array
+ items:
+ $ref: '#/definitions/FindResult'
CreateDependenciesRequest:
type: object
properties:
@@ -2413,16 +2514,12 @@ definitions:
type: string
sslCiphers:
type: string
- autoSyncInterval:
- type: string
- compactIndexDelta:
- type: integer
- compactInterval:
- type: string
- logRotateSize:
- type: integer
- logBackupCount:
- type: integer
+ enablePProf:
+ type: boolean
+ enableCache:
+ type: boolean
+ selfRegister:
+ type: boolean
DumpResponse:
type: object
properties:
@@ -2446,3 +2543,12 @@ definitions:
properties:
clusters:
$ref: '#/definitions/Clusters'
+ Error:
+ type: object
+ properties:
+ errorCode:
+ type: string
+ errorMessage:
+ type: string
+ detail:
+ type: string
diff --git a/server/plugin/pkg/discovery/aggregate/repo.go
b/server/plugin/pkg/discovery/aggregate/repo.go
index cb6d7e3..4393bd5 100644
--- a/server/plugin/pkg/discovery/aggregate/repo.go
+++ b/server/plugin/pkg/discovery/aggregate/repo.go
@@ -18,6 +18,7 @@ package aggregate
import (
mgr "github.com/apache/servicecomb-service-center/server/plugin"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
+
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
)
func init() {
@@ -32,5 +33,12 @@ func (r *AggregateRepository) New(t discovery.Type, cfg
*discovery.Config) disco
}
func NewRepository() mgr.PluginInstance {
+ InitConfigs()
return &AggregateRepository{}
}
+
+func InitConfigs() {
+ mgr.DISCOVERY.ActiveConfigs().
+ Set("config", registry.Configuration()).
+ Set("aggregateMode", repos)
+}
diff --git a/server/plugin/pkg/discovery/etcd/repo.go
b/server/plugin/pkg/discovery/etcd/repo.go
index 3fb614d..8239fa1 100644
--- a/server/plugin/pkg/discovery/etcd/repo.go
+++ b/server/plugin/pkg/discovery/etcd/repo.go
@@ -18,6 +18,7 @@ package etcd
import (
mgr "github.com/apache/servicecomb-service-center/server/plugin"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
+
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
)
func init() {
@@ -33,5 +34,11 @@ func (r *EtcdRepository) New(t discovery.Type, cfg
*discovery.Config) discovery.
}
func NewRepository() mgr.PluginInstance {
+ InitConfigs()
return &EtcdRepository{}
}
+
+func InitConfigs() {
+ mgr.DISCOVERY.ActiveConfigs().
+ Set("config", registry.Configuration())
+}
diff --git a/server/plugin/pkg/discovery/servicecenter/repo.go
b/server/plugin/pkg/discovery/servicecenter/repo.go
index 93512bf..a15a3ea 100644
--- a/server/plugin/pkg/discovery/servicecenter/repo.go
+++ b/server/plugin/pkg/discovery/servicecenter/repo.go
@@ -18,6 +18,7 @@ package servicecenter
import (
mgr "github.com/apache/servicecomb-service-center/server/plugin"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
+
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
)
func init() {
@@ -32,5 +33,11 @@ func (r *ServiceCenterRepository) New(t discovery.Type, cfg
*discovery.Config) d
}
func NewRepository() mgr.PluginInstance {
+ InitConfigs()
return &ServiceCenterRepository{}
}
+
+func InitConfigs() {
+ mgr.DISCOVERY.ActiveConfigs().
+ Set("config", registry.Configuration())
+}
diff --git a/server/plugin/pkg/registry/config.go
b/server/plugin/pkg/registry/config.go
index 6113cce..ef36bad 100644
--- a/server/plugin/pkg/registry/config.go
+++ b/server/plugin/pkg/registry/config.go
@@ -30,15 +30,14 @@ var (
)
type Config struct {
- SslEnabled bool
- EmbedMode string
- ManagerAddress string
- ClusterName string
- ClusterAddresses string // the raw string of cluster configuration
- Clusters Clusters // parsed from ClusterAddresses
- DialTimeout time.Duration
- RequestTimeOut time.Duration
- AutoSyncInterval time.Duration
+ SslEnabled bool `json:"-"`
+ ManagerAddress string `json:"manageAddress,omitempty"`
+ ClusterName string `json:"manageName,omitempty"`
+ ClusterAddresses string `json:"manageClusters,omitempty"` // the
raw string of cluster configuration
+ Clusters Clusters `json:"-"` //
parsed from ClusterAddresses
+ DialTimeout time.Duration `json:"connectTimeout"`
+ RequestTimeOut time.Duration `json:"registryTimeout"`
+ AutoSyncInterval time.Duration `json:"autoSyncInterval"`
}
func (c *Config) InitClusters() {
diff --git a/server/rest/controller/v4/instance_controller.go
b/server/rest/controller/v4/instance_controller.go
index d6e5fdb..36efd16 100644
--- a/server/rest/controller/v4/instance_controller.go
+++ b/server/rest/controller/v4/instance_controller.go
@@ -38,6 +38,7 @@ type MicroServiceInstanceService struct {
func (this *MicroServiceInstanceService) URLPatterns() []rest.Route {
return []rest.Route{
{rest.HTTP_METHOD_GET, "/v4/:project/registry/instances",
this.FindInstances},
+ {rest.HTTP_METHOD_POST, "/v4/:project/registry/instances",
this.BatchFindInstances},
{rest.HTTP_METHOD_GET,
"/v4/:project/registry/microservices/:serviceId/instances", this.GetInstances},
{rest.HTTP_METHOD_GET,
"/v4/:project/registry/microservices/:serviceId/instances/:instanceId",
this.GetOneInstance},
{rest.HTTP_METHOD_POST,
"/v4/:project/registry/microservices/:serviceId/instances",
this.RegisterInstance},
@@ -154,6 +155,29 @@ func (this *MicroServiceInstanceService) FindInstances(w
http.ResponseWriter, r
controller.WriteResponse(w, respInternal, resp)
}
+func (this *MicroServiceInstanceService) BatchFindInstances(w
http.ResponseWriter, r *http.Request) {
+ message, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ log.Error("read body failed", err)
+ controller.WriteError(w, scerr.ErrInvalidParams, err.Error())
+ return
+ }
+
+ request := &pb.BatchFindInstancesRequest{}
+ err = json.Unmarshal(message, request)
+ if err != nil {
+ log.Errorf(err, "Invalid json: %s",
util.BytesToStringWithNoCopy(message))
+ controller.WriteError(w, scerr.ErrInvalidParams, "Unmarshal
error")
+ return
+ }
+ request.ConsumerServiceId = r.Header.Get("X-ConsumerId")
+ ctx := util.SetTargetDomainProject(r.Context(),
r.Header.Get("X-Domain-Name"), r.URL.Query().Get(":project"))
+ resp, _ := core.InstanceAPI.BatchFind(ctx, request)
+ respInternal := resp.Response
+ resp.Response = nil
+ controller.WriteResponse(w, respInternal, resp)
+}
+
func (this *MicroServiceInstanceService) GetOneInstance(w http.ResponseWriter,
r *http.Request) {
var ids []string
query := r.URL.Query()
diff --git a/server/service/instance.go b/server/service/instance.go
index 667d675..53eca31 100644
--- a/server/service/instance.go
+++ b/server/service/instance.go
@@ -616,6 +616,46 @@ func (s *InstanceService) Find(ctx context.Context, in
*pb.FindInstancesRequest)
}, nil
}
+func (s *InstanceService) BatchFind(ctx context.Context, in
*pb.BatchFindInstancesRequest) (*pb.BatchFindInstancesResponse, error) {
+ err := Validate(in)
+ if err != nil {
+ log.Errorf(err, "batch find instance failed: invalid
parameters")
+ return &pb.BatchFindInstancesResponse{
+ Response: pb.CreateResponse(scerr.ErrInvalidParams,
err.Error()),
+ }, nil
+ }
+
+ response := &pb.BatchFindInstancesResponse{
+ Response: pb.CreateResponse(pb.Response_SUCCESS, "Batch query
service instances successfully."),
+ }
+ failedResult := make(map[int32]*pb.FindFailedResult)
+ for index, key := range in.Services {
+ cloneCtx := util.SetContext(ctx,
serviceUtil.CTX_REQUEST_REVISION, key.Rev)
+ resp, err := s.Find(cloneCtx, &pb.FindInstancesRequest{
+ ConsumerServiceId: in.ConsumerServiceId,
+ AppId: key.Service.AppId,
+ ServiceName: key.Service.ServiceName,
+ VersionRule: key.Service.Version,
+ Environment: key.Service.Environment,
+ })
+ if err != nil {
+ return &pb.BatchFindInstancesResponse{
+ Response: resp.Response,
+ }, err
+ }
+ failed, ok := failedResult[resp.GetResponse().GetCode()]
+ serviceUtil.AppendFindResponse(cloneCtx, int64(index), resp,
+ &response.Updated, &response.NotModified, &failed)
+ if !ok && failed != nil {
+ failedResult[resp.GetResponse().GetCode()] = failed
+ }
+ }
+ for _, result := range failedResult {
+ response.Failed = append(response.Failed, result)
+ }
+ return response, nil
+}
+
func (s *InstanceService) reshapeProviderKey(ctx context.Context, provider
*pb.MicroServiceKey, providerId string) (*pb.MicroServiceKey, error) {
//维护version的规则,service name 可能是别名,所以重新获取
providerService, err := serviceUtil.GetService(ctx, provider.Tenant,
providerId)
diff --git a/server/service/instance_test.go b/server/service/instance_test.go
index d40dd63..3b67e01 100644
--- a/server/service/instance_test.go
+++ b/server/service/instance_test.go
@@ -1035,7 +1035,7 @@ var _ = Describe("'Instance' service", func() {
respFind, err =
instanceResource.Find(getContext(), &pb.FindInstancesRequest{
ConsumerServiceId: serviceId1,
AppId: "query_instance",
- ServiceName: "",
+ ServiceName: " ",
VersionRule: "1.0.0",
})
Expect(err).To(BeNil())
@@ -1135,7 +1135,248 @@ var _ = Describe("'Instance' service", func() {
VersionRule: "2.0.0",
})
Expect(err).To(BeNil())
-
Expect(respFind.Response.Code).ToNot(Equal(pb.Response_SUCCESS))
+
Expect(respFind.Response.Code).To(Equal(scerr.ErrServiceNotExists))
+ })
+ })
+
+ Context("when batch query invalid parameters", func() {
+ It("should be failed", func() {
+ By("invalid services")
+ respFind, err :=
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Services: nil,
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Services: []*pb.FindService{},
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Services:
[]*pb.FindService{{}},
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
+
+ By("invalid appId")
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
TOO_LONG_APPID,
+ ServiceName:
"query_instance_service",
+ Version:
"1.0.0",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId: "",
+ ServiceName:
"query_instance_service",
+ Version:
"1.0.0",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId: "
",
+ ServiceName:
"query_instance_service",
+ Version:
"1.0.0",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
+
+ By("invalid serviceName")
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName:
TOO_LONG_EXISTENCE,
+ Version:
"1.0.0",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName: "",
+ Version:
"1.0.0",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName: "
",
+ Version:
"1.0.0",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
+
+ By("invalid version")
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName:
"query_instance_service",
+ Version:
"1.32768.0",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName:
"query_instance_service",
+ Version:
"1.0.0-1.32768.0",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName:
"query_instance_service",
+ Version: "
",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName:
"query_instance_service",
+ Version: "",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(scerr.ErrInvalidParams))
+
+ By("consumerId is empty")
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName:
"query_instance_service",
+ Version:
"1.0.0+",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
+
+ By("provider does not exist")
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName:
"noneservice",
+ Version:
"latest",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
+
Expect(respFind.Failed[0].Error.Code).To(Equal(scerr.ErrServiceNotExists))
+
Expect(respFind.Failed[0].Indexes[0]).To(Equal(int64(0)))
+
+ By("provider does not contain 3.0.0+ versions")
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName:
"query_instance_service",
+ Version:
"3.0.0+",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
+
Expect(len(respFind.Updated[0].Instances)).To(Equal(0))
+
Expect(respFind.Updated[0].Index).To(Equal(int64(0)))
+ Expect(respFind.Updated[0].Rev).ToNot(Equal(""))
+
+ By("consumer does not exist")
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: "notExistServiceId",
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName:
"query_instance_service",
+ Version:
"2.0.0",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
+
Expect(respFind.Failed[0].Indexes[0]).To(Equal(int64(0)))
+
Expect(respFind.Failed[0].Error.Code).To(Equal(scerr.ErrServiceNotExists))
})
})
@@ -1322,6 +1563,198 @@ var _ = Describe("'Instance' service", func() {
})
})
+ Context("when batch query instances", func() {
+ It("should be passed", func() {
+ By("find with version rule")
+ respFind, err :=
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId1,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName:
"query_instance_service",
+ Version:
"latest",
+ },
+ },
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName:
"query_instance_service",
+ Version:
"1.0.0+",
+ },
+ },
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName:
"query_instance_service",
+ Version:
"0.0.0",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
+
Expect(respFind.Updated[0].Index).To(Equal(int64(0)))
+
Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId2))
+
Expect(respFind.Updated[1].Index).To(Equal(int64(1)))
+
Expect(respFind.Updated[1].Instances[0].InstanceId).To(Equal(instanceId2))
+
Expect(respFind.Failed[0].Indexes[0]).To(Equal(int64(2)))
+
Expect(respFind.Failed[0].Error.Code).To(Equal(scerr.ErrServiceNotExists))
+
+ By("find with env")
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId4,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName:
"query_instance_diff_env_service",
+ Version:
"1.0.0",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
+
Expect(len(respFind.Updated[0].Instances)).To(Equal(1))
+
Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId4))
+
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ Environment:
pb.ENV_PROD,
+ AppId:
"query_instance",
+ ServiceName:
"query_instance_diff_env_service",
+ Version:
"1.0.0",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
+
Expect(len(respFind.Updated[0].Instances)).To(Equal(1))
+
Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId4))
+
+ By("find with rev")
+ ctx := util.SetContext(getContext(),
serviceUtil.CTX_NOCACHE, "")
+ respFind, err = instanceResource.BatchFind(ctx,
&pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId8,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName:
"query_instance_with_rev",
+ Version:
"1.0.0",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
+ rev := respFind.Updated[0].Rev
+
Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId8))
+ Expect(len(rev)).NotTo(Equal(0))
+
+ respFind, err = instanceResource.BatchFind(ctx,
&pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId8,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName:
"query_instance_with_rev",
+ Version:
"1.0.0",
+ },
+ Rev: "x",
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
+
Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId8))
+ Expect(respFind.Updated[0].Rev).To(Equal(rev))
+
+ respFind, err = instanceResource.BatchFind(ctx,
&pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId8,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName:
"query_instance_with_rev",
+ Version:
"1.0.0",
+ },
+ Rev: rev,
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
+
Expect(respFind.NotModified[0]).To(Equal(int64(0)))
+
+ By("find should return 200 even if consumer is
diff apps")
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId3,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"query_instance",
+ ServiceName:
"query_instance_service",
+ Version:
"1.0.5",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
+
Expect(len(respFind.Updated[0].Instances)).To(Equal(0))
+
+ By("shared service discovery")
+ os.Setenv("CSE_SHARED_SERVICES",
"query_instance_shared_provider")
+ core.SetSharedMode()
+ core.Service.Environment = pb.ENV_PROD
+
+ respFind, err = instanceResource.BatchFind(
+ util.SetTargetDomainProject(
+
util.SetDomainProject(util.CloneContext(getContext()), "user", "user"),
+ "default", "default"),
+ &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId6,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"default",
+
ServiceName: "query_instance_shared_provider",
+
Version: "1.0.0",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
+
Expect(len(respFind.Updated[0].Instances)).To(Equal(1))
+
Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId5))
+
+ respFind, err =
instanceResource.BatchFind(getContext(), &pb.BatchFindInstancesRequest{
+ ConsumerServiceId: serviceId7,
+ Services: []*pb.FindService{
+ {
+ Service:
&pb.MicroServiceKey{
+ AppId:
"default",
+ ServiceName:
"query_instance_shared_provider",
+ Version:
"1.0.0",
+ },
+ },
+ },
+ })
+ Expect(err).To(BeNil())
+
Expect(respFind.Response.Code).To(Equal(pb.Response_SUCCESS))
+
Expect(len(respFind.Updated[0].Instances)).To(Equal(1))
+
Expect(respFind.Updated[0].Instances[0].InstanceId).To(Equal(instanceId5))
+
+ core.Service.Environment = pb.ENV_DEV
+ })
+ })
+
Context("when query instances between diff dimensions", func() {
It("should be failed", func() {
By("diff appId")
diff --git a/server/service/instance_validator.go
b/server/service/instance_validator.go
index 9668d76..cf7a1df 100644
--- a/server/service/instance_validator.go
+++ b/server/service/instance_validator.go
@@ -26,6 +26,7 @@ import (
var (
findInstanceReqValidator validate.Validator
+ batchFindInstanceReqValidator validate.Validator
getInstanceReqValidator validate.Validator
updateInstanceReqValidator validate.Validator
registerInstanceReqValidator validate.Validator
@@ -56,6 +57,17 @@ func FindInstanceReqValidator() *validate.Validator {
})
}
+func BatchFindInstanceReqValidator() *validate.Validator {
+ return batchFindInstanceReqValidator.Init(func(v *validate.Validator) {
+ var findServiceValidator validate.Validator
+ findServiceValidator.AddRule("Service",
&validate.ValidateRule{Min: 1})
+ findServiceValidator.AddSub("Service", ExistenceReqValidator())
+ v.AddRule("ConsumerServiceId",
GetInstanceReqValidator().GetRule("ConsumerServiceId"))
+ v.AddRule("Services", &validate.ValidateRule{Min: 1})
+ v.AddSub("Services", &findServiceValidator)
+ })
+}
+
func GetInstanceReqValidator() *validate.Validator {
return getInstanceReqValidator.Init(func(v *validate.Validator) {
v.AddRule("ConsumerServiceId", &validate.ValidateRule{Max: 64,
Regexp: serviceIdRegex})
diff --git a/server/service/util/instance_util.go
b/server/service/util/instance_util.go
index 302347f..1e0e791 100644
--- a/server/service/util/instance_util.go
+++ b/server/service/util/instance_util.go
@@ -272,3 +272,27 @@ func UpdateInstance(ctx context.Context, domainProject
string, instance *pb.Micr
}
return nil
}
+
+func AppendFindResponse(ctx context.Context, index int64, find
*pb.FindInstancesResponse,
+ updatedResult *[]*pb.FindResult, notModifiedResult *[]int64,
failedResult **pb.FindFailedResult) {
+ if code := find.GetResponse().GetCode(); code != pb.Response_SUCCESS {
+ if *failedResult == nil {
+ *failedResult = &pb.FindFailedResult{
+ Error: scerr.NewError(code,
find.GetResponse().GetMessage()),
+ }
+ }
+ (*failedResult).Indexes = append((*failedResult).Indexes, index)
+ return
+ }
+ iv, _ := ctx.Value(CTX_REQUEST_REVISION).(string)
+ ov, _ := ctx.Value(CTX_RESPONSE_REVISION).(string)
+ if len(iv) > 0 && iv == ov {
+ *notModifiedResult = append(*notModifiedResult, index)
+ return
+ }
+ *updatedResult = append(*updatedResult, &pb.FindResult{
+ Index: index,
+ Instances: find.Instances,
+ Rev: ov,
+ })
+}
diff --git a/server/service/util/instance_util_test.go
b/server/service/util/instance_util_test.go
index abe5182..50087f2 100644
--- a/server/service/util/instance_util_test.go
+++ b/server/service/util/instance_util_test.go
@@ -20,6 +20,7 @@ import (
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/core/proto"
pb "github.com/apache/servicecomb-service-center/server/core/proto"
+ scerr "github.com/apache/servicecomb-service-center/server/error"
"golang.org/x/net/context"
"testing"
)
@@ -115,3 +116,77 @@ func TestUpdateInstance(t *testing.T) {
t.Fatalf(`UpdateInstance CTX_NOCACHE failed`)
}
}
+
+func TestAppendFindResponse(t *testing.T) {
+ ctx := context.Background()
+ var (
+ find pb.FindInstancesResponse
+ updatedResult []*pb.FindResult
+ notModifiedResult []int64
+ failedResult *pb.FindFailedResult
+ )
+ AppendFindResponse(ctx, 1, &find, &updatedResult, ¬ModifiedResult,
&failedResult)
+ if updatedResult == nil || notModifiedResult != nil || failedResult !=
nil {
+ t.Fatal("TestAppendFindResponse failed")
+ }
+ if updatedResult[0].Index != 1 {
+ t.Fatal("TestAppendFindResponse failed")
+ }
+
+ updatedResult = nil
+ cloneCtx := context.WithValue(ctx, CTX_RESPONSE_REVISION, "1")
+ AppendFindResponse(cloneCtx, 1, &find, &updatedResult,
¬ModifiedResult, &failedResult)
+ if updatedResult == nil || notModifiedResult != nil || failedResult !=
nil {
+ t.Fatal("TestAppendFindResponse failed")
+ }
+ if updatedResult[0].Index != 1 || updatedResult[0].Rev != "1" {
+ t.Fatal("TestAppendFindResponse failed")
+ }
+
+ updatedResult = nil
+ cloneCtx = context.WithValue(ctx, CTX_REQUEST_REVISION, "1")
+ cloneCtx = context.WithValue(cloneCtx, CTX_RESPONSE_REVISION, "1")
+ AppendFindResponse(cloneCtx, 1, &find, &updatedResult,
¬ModifiedResult, &failedResult)
+ if updatedResult != nil || notModifiedResult == nil || failedResult !=
nil {
+ t.Fatal("TestAppendFindResponse failed")
+ }
+ if notModifiedResult[0] != 1 {
+ t.Fatal("TestAppendFindResponse failed")
+ }
+
+ notModifiedResult = nil
+ find.Response = pb.CreateResponse(scerr.ErrInternal, "test")
+ AppendFindResponse(ctx, 1, &find, &updatedResult, ¬ModifiedResult,
&failedResult)
+ if updatedResult != nil || notModifiedResult != nil || failedResult ==
nil {
+ t.Fatal("TestAppendFindResponse failed")
+ }
+ if failedResult.Error.Code != scerr.ErrInternal {
+ t.Fatal("TestAppendFindResponse failed")
+ }
+ find.Response = pb.CreateResponse(scerr.ErrInvalidParams, "test")
+ AppendFindResponse(ctx, 2, &find, &updatedResult, ¬ModifiedResult,
&failedResult)
+ if updatedResult != nil || notModifiedResult != nil || failedResult ==
nil {
+ t.Fatal("TestAppendFindResponse failed")
+ }
+ if failedResult.Error.Code != scerr.ErrInternal {
+ t.Fatal("TestAppendFindResponse failed")
+ }
+
+ failedResult = nil
+ find.Response = nil
+ AppendFindResponse(ctx, 1, &find, &updatedResult, ¬ModifiedResult,
&failedResult)
+ AppendFindResponse(ctx, 2, &find, &updatedResult, ¬ModifiedResult,
&failedResult)
+ cloneCtx = context.WithValue(ctx, CTX_REQUEST_REVISION, "1")
+ cloneCtx = context.WithValue(cloneCtx, CTX_RESPONSE_REVISION, "1")
+ AppendFindResponse(cloneCtx, 3, &find, &updatedResult,
¬ModifiedResult, &failedResult)
+ AppendFindResponse(cloneCtx, 4, &find, &updatedResult,
¬ModifiedResult, &failedResult)
+ find.Response = pb.CreateResponse(scerr.ErrInternal, "test")
+ AppendFindResponse(ctx, 5, &find, &updatedResult, ¬ModifiedResult,
&failedResult)
+ AppendFindResponse(ctx, 6, &find, &updatedResult, ¬ModifiedResult,
&failedResult)
+ if updatedResult == nil || notModifiedResult == nil || failedResult ==
nil {
+ t.Fatal("TestAppendFindResponse failed")
+ }
+ if len(updatedResult) != 2 || len(notModifiedResult) != 2 ||
len(failedResult.Indexes) != 2 {
+ t.Fatal("TestAppendFindResponse failed")
+ }
+}
diff --git a/server/service/validate.go b/server/service/validate.go
index c13728a..825703d 100644
--- a/server/service/validate.go
+++ b/server/service/validate.go
@@ -74,6 +74,8 @@ func Validate(v interface{}) error {
return RegisterInstanceReqValidator().Validate(v)
case *pb.FindInstancesRequest:
return FindInstanceReqValidator().Validate(v)
+ case *pb.BatchFindInstancesRequest:
+ return BatchFindInstanceReqValidator().Validate(v)
case *pb.HeartbeatRequest, *pb.UnregisterInstanceRequest:
return HeartbeatReqValidator().Validate(v)
case *pb.UpdateInstancePropsRequest: