This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new fd9e2f2  fast register (#948) (#953)
fd9e2f2 is described below

commit fd9e2f2ebc2577b2de9a3313f0e9858799edfe8c
Author: fuziye01 <[email protected]>
AuthorDate: Wed Apr 28 17:57:57 2021 +0800

    fast register (#948) (#953)
---
 README.md                                        |   1 +
 datasource/mongo/event/instance_event_handler.go |   1 +
 datasource/mongo/fast_register_config.go         |  40 +++++
 datasource/mongo/fast_register_inst_service.go   |  55 +++++++
 datasource/mongo/fast_register_test.go           | 146 ++++++++++++++++++
 datasource/mongo/fast_register_timer.go          | 184 +++++++++++++++++++++++
 datasource/mongo/mongo.go                        |  24 ++-
 datasource/mongo/ms.go                           | 141 ++++++++++++++---
 datasource/mongo/types.go                        |  13 ++
 docs/user-guides.rst                             |   1 +
 docs/user-guides/fast-registration.md            |  76 ++++++++++
 docs/user-guides/fast_register_design.png        | Bin 0 -> 5385994 bytes
 etc/conf/app.yaml                                |   6 +
 13 files changed, 659 insertions(+), 29 deletions(-)

diff --git a/README.md b/README.md
index 8d3379d..778c6f7 100644
--- a/README.md
+++ b/README.md
@@ -31,6 +31,7 @@ It provides out of box support for metrics and tracing. It 
has a web portal to m
  - 
**[`Datacenters`](https://service-center.readthedocs.io/en/latest/dev-guides/multidcs.html)**:
 Additional layer of abstraction to clusters deployed in multiple datacenters
  - 
**[`Aggregation`](https://service-center.readthedocs.io/en/latest/design-guides/design.html)**:
 Able to aggregate microservices from multiple registry platforms and
     support platform registry and client side registry at the same time
+ - 
**[`FastRegister`](https://service-center.readthedocs.io/en/latest/user-guides/fast-registration.html)**:
 Fast register instance to service center
 
 ## Documentation
 
diff --git a/datasource/mongo/event/instance_event_handler.go 
b/datasource/mongo/event/instance_event_handler.go
index f367c15..0d0bebd 100644
--- a/datasource/mongo/event/instance_event_handler.go
+++ b/datasource/mongo/event/instance_event_handler.go
@@ -75,6 +75,7 @@ func (h InstanceEventHandler) OnEvent(evt sd.MongoEvent) {
                metrics.ReportInstances(instance.Domain, increaseOne)
                frameworkName, frameworkVersion := getFramework(microService)
                metrics.ReportFramework(instance.Domain, instance.Project, 
frameworkName, frameworkVersion, increaseOne)
+               return
        case discovery.EVT_CREATE:
                metrics.ReportInstances(instance.Domain, increaseOne)
        case discovery.EVT_DELETE:
diff --git a/datasource/mongo/fast_register_config.go 
b/datasource/mongo/fast_register_config.go
new file mode 100644
index 0000000..753d746
--- /dev/null
+++ b/datasource/mongo/fast_register_config.go
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except request compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to request writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mongo
+
+import (
+       "sync"
+
+       "github.com/apache/servicecomb-service-center/server/config"
+)
+
+var (
+       configOnce    sync.Once
+       fastRegConfig FastRegConfig
+)
+
+type FastRegConfig struct {
+       QueueSize int
+}
+
+func FastRegConfiguration() *FastRegConfig {
+       configOnce.Do(func() {
+               fastRegConfig.QueueSize = 
config.GetInt("registry.fastRegistration.queueSize", 0)
+       })
+       return &fastRegConfig
+}
diff --git a/datasource/mongo/fast_register_inst_service.go 
b/datasource/mongo/fast_register_inst_service.go
new file mode 100644
index 0000000..25eaac3
--- /dev/null
+++ b/datasource/mongo/fast_register_inst_service.go
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except request compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to request writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mongo
+
+var fastRegisterInstanceService *RegisterFastInstanceService
+
+type RegisterFastInstanceService struct {
+       InstEventCh  chan *InstanceRegisterEvent
+       FailedInstCh chan *InstanceRegisterEvent
+}
+
+func NewFastRegisterInstanceService() *RegisterFastInstanceService {
+       fastRegisterQueueSize := fastRegConfig.QueueSize
+       return &RegisterFastInstanceService{
+               InstEventCh:  make(chan *InstanceRegisterEvent, 
fastRegisterQueueSize),
+               FailedInstCh: make(chan *InstanceRegisterEvent, 
fastRegisterQueueSize),
+       }
+}
+
+func GetFastRegisterInstanceService() *RegisterFastInstanceService {
+       return fastRegisterInstanceService
+}
+
+func SetFastRegisterInstanceService(service *RegisterFastInstanceService) {
+       fastRegisterInstanceService = service
+}
+
+func (s *RegisterFastInstanceService) AddEvent(event *InstanceRegisterEvent) {
+       s.InstEventCh <- event
+}
+
+func (s *RegisterFastInstanceService) AddFailedEvent(event 
*InstanceRegisterEvent) {
+       s.FailedInstCh <- event
+}
+
+func (s *RegisterFastInstanceService) AddFailedEvents(events 
[]*InstanceRegisterEvent) {
+       for _, event := range events {
+               s.FailedInstCh <- event
+       }
+}
diff --git a/datasource/mongo/fast_register_test.go 
b/datasource/mongo/fast_register_test.go
new file mode 100644
index 0000000..ee9e790
--- /dev/null
+++ b/datasource/mongo/fast_register_test.go
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except request compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to request writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mongo_test
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       "github.com/apache/servicecomb-service-center/datasource"
+       "github.com/apache/servicecomb-service-center/datasource/mongo"
+       pb "github.com/go-chassis/cari/discovery"
+       "github.com/stretchr/testify/assert"
+)
+
+func TestInstance_BatchCreate(t *testing.T) {
+       var serviceID string
+
+       mongo.FastRegConfiguration().QueueSize = 100
+       fastRegisterService := mongo.NewFastRegisterInstanceService()
+       mongo.SetFastRegisterInstanceService(fastRegisterService)
+
+       fastRegisterTimeTask := mongo.NewRegisterTimeTask()
+       fastRegisterTimeTask.Start()
+
+       t.Run("given service to register instance expect register success", 
func(t *testing.T) {
+               respCreateService, err := 
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+                       Service: &pb.MicroService{
+                               ServiceName: "create_instance_service_ms",
+                               AppId:       "create_instance_ms",
+                               Version:     "1.0.0",
+                               Level:       "FRONT",
+                               Status:      pb.MS_UP,
+                       },
+               })
+
+               assert.NoError(t, err)
+               assert.Equal(t, pb.ResponseSuccess, 
respCreateService.Response.GetCode())
+               serviceID = respCreateService.ServiceId
+
+       })
+
+       t.Run("when instance request is 3 expect batch register success", 
func(t *testing.T) {
+               request := &pb.RegisterInstanceRequest{
+                       Instance: &pb.MicroServiceInstance{
+                               ServiceId: serviceID,
+                               Endpoints: []string{
+                                       "createInstance_ms:127.0.0.1:8080",
+                               },
+                               HostName: "UT-HOST",
+                               Status:   pb.MSI_UP,
+                       },
+               }
+
+               getInstsReq := &pb.GetInstancesRequest{
+                       ProviderServiceId: serviceID,
+               }
+
+               getInstsResp, err := 
datasource.Instance().GetInstances(context.TODO(), getInstsReq)
+               assert.NoError(t, err)
+               beforLen := len(getInstsResp.Instances)
+
+               instanceBatchLen := 3
+               for i := 0; i < instanceBatchLen; i++ {
+                       event := &mongo.InstanceRegisterEvent{Ctx: 
context.TODO(), Request: request}
+                       fastRegisterService.AddEvent(event)
+               }
+               assert.Equal(t, instanceBatchLen, 
len(mongo.GetFastRegisterInstanceService().InstEventCh))
+               time.Sleep(500 * time.Millisecond)
+
+               assert.Equal(t, 0, 
len(mongo.GetFastRegisterInstanceService().InstEventCh))
+               assert.Equal(t, 0, 
len(mongo.GetFastRegisterInstanceService().FailedInstCh))
+
+               //if mongo is not replSet, batch register will failed, should 
wait failed instance register
+               time.Sleep(5 * time.Second)
+
+               getInstsResp, err = 
datasource.Instance().GetInstances(context.TODO(), getInstsReq)
+               assert.NoError(t, err)
+               afterLen := len(getInstsResp.Instances)
+               assert.Equal(t, instanceBatchLen, afterLen-beforLen)
+       })
+
+       t.Run("when instanceID is custom expect register success", func(t 
*testing.T) {
+
+               request := &pb.RegisterInstanceRequest{
+                       Instance: &pb.MicroServiceInstance{
+                               InstanceId: "customId_ms",
+                               ServiceId:  serviceID,
+                               Endpoints: []string{
+                                       "createInstance_ms:127.0.0.1:8080",
+                               },
+                               HostName: "UT-HOST",
+                               Status:   pb.MSI_UP,
+                       },
+               }
+
+               event := &mongo.InstanceRegisterEvent{Ctx: context.TODO(), 
Request: request}
+               fastRegisterService.AddEvent(event)
+
+               assert.Equal(t, 1, 
len(mongo.GetFastRegisterInstanceService().InstEventCh))
+               time.Sleep(500 * time.Millisecond)
+
+               assert.Equal(t, 0, 
len(mongo.GetFastRegisterInstanceService().InstEventCh))
+               assert.Equal(t, 0, 
len(mongo.GetFastRegisterInstanceService().FailedInstCh))
+       })
+
+       t.Run("when has failed instance expect retry register success", func(t 
*testing.T) {
+
+               request := &pb.RegisterInstanceRequest{
+                       Instance: &pb.MicroServiceInstance{
+                               InstanceId: "failedInstanceId",
+                               ServiceId:  serviceID,
+                               Endpoints: []string{
+                                       "createInstance_ms:127.0.0.1:8080",
+                               },
+                               HostName: "UT-HOST",
+                               Status:   pb.MSI_UP,
+                       },
+               }
+
+               event := &mongo.InstanceRegisterEvent{Ctx: context.TODO(), 
Request: request}
+               fastRegisterService.AddFailedEvent(event)
+
+               time.Sleep(100 * time.Millisecond)
+
+               assert.Equal(t, 0, 
len(mongo.GetFastRegisterInstanceService().InstEventCh))
+               assert.Equal(t, 0, 
len(mongo.GetFastRegisterInstanceService().FailedInstCh))
+       })
+
+       fastRegisterTimeTask.Stop()
+}
diff --git a/datasource/mongo/fast_register_timer.go 
b/datasource/mongo/fast_register_timer.go
new file mode 100644
index 0000000..1c3e2f0
--- /dev/null
+++ b/datasource/mongo/fast_register_timer.go
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mongo
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "runtime"
+       "time"
+
+       "github.com/apache/servicecomb-service-center/pkg/gopool"
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+)
+
+const (
+       loopTime              = 100 * time.Millisecond
+       batchLen              = 10000
+       fuseMinCount          = 3
+       fuseTime              = 5 * time.Second
+       maxRegisterFailedTime = 500
+       ctxCancelTimeOut      = 60 * time.Second
+)
+
+var fastRegisterTimeTask *FastRegisterTimeTask
+
+type FastRegisterTimeTask struct {
+       goroutine *gopool.Pool
+}
+
+func NewRegisterTimeTask() *FastRegisterTimeTask {
+       return &FastRegisterTimeTask{
+               goroutine: gopool.New(context.Background()),
+       }
+}
+
+func (rt *FastRegisterTimeTask) Start() {
+       gopool.Go(fastRegisterTimeTask.loopRegister)
+}
+
+func (rt *FastRegisterTimeTask) Stop() {
+       rt.goroutine.Close(true)
+}
+
+func (rt *FastRegisterTimeTask) loopRegister(ctx context.Context) {
+       blockCh := make(chan struct{}, runtime.NumCPU()-1)
+       failedCount := make(chan int, 1)
+       failedCount <- 0
+       ticker := time.NewTicker(loopTime)
+
+       for {
+               select {
+               case <-ctx.Done():
+                       // server shutdown
+                       return
+               case <-ticker.C:
+                       length := 
len(GetFastRegisterInstanceService().InstEventCh)
+                       if length == 0 {
+                               continue
+                       }
+
+                       // fuse is triggered if failed registry counts more 
than fuseMinCount
+                       count := <-failedCount
+                       if count > fuseMinCount {
+                               time.Sleep(fuseTime)
+                       }
+                       failedCount <- count
+
+                       events := rt.generateEvents(length)
+                       blockCh <- struct{}{}
+                       go rt.RegisterInstancesAsync(events, blockCh, 
failedCount)
+
+               case event, ok := 
<-GetFastRegisterInstanceService().FailedInstCh:
+                       // if instance batch register failed, register it single
+                       if !ok {
+                               log.Error("failed instance channel is error", 
errors.New("channel closed"))
+                               continue
+                       }
+
+                       if event.failedTime > maxRegisterFailedTime {
+                               log.Error(fmt.Sprintf("instance register retry 
time is more than max register time:%d, "+
+                                       "the instance params maybe wrong, drop 
it", maxRegisterFailedTime),
+                                       errors.New("retry register instance 
failed"))
+                               continue
+                       }
+
+                       blockCh <- struct{}{}
+
+                       go rt.RegisterInstance(event, blockCh)
+               }
+       }
+}
+
+func (rt *FastRegisterTimeTask) generateEvents(length int) 
[]*InstanceRegisterEvent {
+       // if channel len >= batch len, use batch len, otherwise use channel len
+       if length >= batchLen {
+               length = batchLen
+       }
+
+       events := make([]*InstanceRegisterEvent, 0)
+
+       for i := 0; i < length; i++ {
+               event, ok := <-GetFastRegisterInstanceService().InstEventCh
+
+               refreshCanceledCtx(event)
+
+               if !ok {
+                       log.Error("instance event channel is error", 
errors.New("channel closed"))
+                       continue
+               }
+               events = append(events, event)
+       }
+       return events
+}
+
+func (rt *FastRegisterTimeTask) RegisterInstancesAsync(events 
[]*InstanceRegisterEvent, blockCh chan struct{}, failedCount chan int) {
+       defer endBlock(blockCh)
+
+       ctx, cancel := context.WithTimeout(context.Background(), 
ctxCancelTimeOut)
+       defer cancel()
+
+       _, err := RegisterInstanceBatch(ctx, events)
+
+       count := <-failedCount
+
+       if err != nil {
+               //failed count
+               count = count + 1
+               failedCount <- count
+
+               //add to failed instance channel, will retry register
+               log.Error("register instances err, retry it", err)
+               GetFastRegisterInstanceService().AddFailedEvents(events)
+               return
+       }
+
+       failedCount <- 0
+}
+
+func (rt *FastRegisterTimeTask) RegisterInstance(event *InstanceRegisterEvent, 
blockCh chan struct{}) {
+       defer endBlock(blockCh)
+
+       cancel := refreshCanceledCtx(event)
+       defer cancel()
+
+       _, err := RegisterInstanceSingle(event.Ctx, event.Request, 
event.isCustomID)
+
+       if err != nil {
+               log.Error(fmt.Sprintf("register instance:%s failed again, 
failed times:%d",
+                       event.Request.Instance.InstanceId, event.failedTime), 
err)
+               event.failedTime = event.failedTime + 1
+               GetFastRegisterInstanceService().AddFailedEvent(event)
+       }
+}
+
+func endBlock(blockCh chan struct{}) {
+       <-blockCh
+}
+
+func refreshCanceledCtx(event *InstanceRegisterEvent) context.CancelFunc {
+       oldCtx := event.Ctx
+       newCtx, cancel := context.WithTimeout(context.Background(), 
ctxCancelTimeOut)
+       newCtx = util.SetDomain(newCtx, util.ParseDomain(oldCtx))
+       newCtx = util.SetProject(newCtx, util.ParseProject(oldCtx))
+       newCtx = util.SetContext(newCtx, util.CtxRemoteIP, 
util.ParseProject(oldCtx))
+       event.Ctx = newCtx
+       return cancel
+}
diff --git a/datasource/mongo/mongo.go b/datasource/mongo/mongo.go
index 94fec3c..8d882c7 100644
--- a/datasource/mongo/mongo.go
+++ b/datasource/mongo/mongo.go
@@ -20,11 +20,6 @@ package mongo
 import (
        "context"
        "fmt"
-       "github.com/apache/servicecomb-service-center/pkg/util"
-
-       "github.com/go-chassis/go-chassis/v2/storage"
-       "go.mongodb.org/mongo-driver/mongo"
-       "go.mongodb.org/mongo-driver/mongo/options"
 
        "github.com/apache/servicecomb-service-center/datasource"
        "github.com/apache/servicecomb-service-center/datasource/mongo/client"
@@ -33,7 +28,11 @@ import (
        "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
        mutil 
"github.com/apache/servicecomb-service-center/datasource/mongo/util"
        "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/util"
        "github.com/apache/servicecomb-service-center/server/config"
+       "github.com/go-chassis/go-chassis/v2/storage"
+       "go.mongodb.org/mongo-driver/mongo"
+       "go.mongodb.org/mongo-driver/mongo/options"
 )
 
 const defaultExpireTime = 300
@@ -77,6 +76,10 @@ func (ds *DataSource) initialize() error {
        }
        // create db index and validator
        EnsureDB()
+
+       // if fast register enabled, init fast register service
+       initFastRegister()
+
        // init cache
        ds.initStore()
        return nil
@@ -234,3 +237,14 @@ func (ds *DataSource) initStore() {
        sd.Store().Run()
        <-sd.Store().Ready()
 }
+
+func initFastRegister() {
+       fastRegConfig := FastRegConfiguration()
+
+       if fastRegConfig.QueueSize > 0 {
+               fastRegisterService := NewFastRegisterInstanceService()
+               SetFastRegisterInstanceService(fastRegisterService)
+
+               NewRegisterTimeTask().Start()
+       }
+}
diff --git a/datasource/mongo/ms.go b/datasource/mongo/ms.go
index bf7773d..175d4d0 100644
--- a/datasource/mongo/ms.go
+++ b/datasource/mongo/ms.go
@@ -23,7 +23,6 @@ import (
        "encoding/json"
        "errors"
        "fmt"
-       "github.com/apache/servicecomb-service-center/datasource/cache"
        "reflect"
        "regexp"
        "sort"
@@ -38,6 +37,7 @@ import (
        "go.mongodb.org/mongo-driver/mongo/options"
 
        "github.com/apache/servicecomb-service-center/datasource"
+       "github.com/apache/servicecomb-service-center/datasource/cache"
        "github.com/apache/servicecomb-service-center/datasource/mongo/client"
        
"github.com/apache/servicecomb-service-center/datasource/mongo/client/dao"
        
"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
@@ -1404,13 +1404,80 @@ func getServiceDetailUtil(ctx context.Context, mgs 
*model.Service, countOnly boo
 }
 
 // Instance management
-func (ds *DataSource) RegisterInstance(ctx context.Context, request 
*discovery.RegisterInstanceRequest) (*discovery.RegisterInstanceResponse, 
error) {
+func (ds *DataSource) RegisterInstance(ctx context.Context,
+       request *discovery.RegisterInstanceRequest) 
(*discovery.RegisterInstanceResponse, error) {
+
+       isCustomID := true
+
+       if len(request.Instance.InstanceId) == 0 {
+               isCustomID = false
+               request.Instance.InstanceId = 
uuid.Generator().GetInstanceID(ctx)
+       }
+
+       // if queueSize is more than 0 and channel is not full, then do fast 
register instance
+       if fastRegConfig.QueueSize > 0 && 
len(GetFastRegisterInstanceService().InstEventCh) < fastRegConfig.QueueSize {
+               // fast register, just add instance to channel and batch 
register them later
+               event := &InstanceRegisterEvent{ctx, request, isCustomID, 0}
+               GetFastRegisterInstanceService().AddEvent(event)
+
+               return &discovery.RegisterInstanceResponse{
+                       Response:   
discovery.CreateResponse(discovery.ResponseSuccess, "Register service instance 
successfully."),
+                       InstanceId: request.Instance.InstanceId,
+               }, nil
+       }
+
+       return RegisterInstanceSingle(ctx, request, isCustomID)
+}
+
+func RegisterInstanceSingle(ctx context.Context, request 
*discovery.RegisterInstanceRequest,
+       isUserDefinedID bool) (*discovery.RegisterInstanceResponse, error) {
+
+       resp, needRegister, err := preProcessRegister(ctx, request.Instance, 
isUserDefinedID)
+
+       if err != nil || !needRegister {
+               log.Error("pre process instance err, or instance already 
existed", err)
+               return resp, err
+       }
+
+       return registryInstance(ctx, request)
+}
+
+func RegisterInstanceBatch(ctx context.Context, events 
[]*InstanceRegisterEvent) (*discovery.RegisterInstanceResponse, error) {
+       instances := make([]interface{}, len(events))
+
+       for i, event := range events {
+               eventCtx := event.Ctx
+               instance := event.Request.Instance
+
+               resp, needRegister, err := preProcessRegister(eventCtx, 
instance, event.isCustomID)
+
+               if err != nil || !needRegister {
+                       log.Error("pre process instance err, or instance 
existed", err)
+                       return resp, err
+               }
+
+               domain := util.ParseDomain(eventCtx)
+               project := util.ParseProject(eventCtx)
+
+               data := model.Instance{
+                       Domain:      domain,
+                       Project:     project,
+                       RefreshTime: time.Now(),
+                       Instance:    instance,
+               }
+               instances[i] = data
+       }
+
+       return registryInstances(ctx, instances)
+}
+
+func preProcessRegister(ctx context.Context, instance 
*discovery.MicroServiceInstance,
+       isUserDefinedID bool) (*discovery.RegisterInstanceResponse, bool, 
error) {
        remoteIP := util.GetIPFromContext(ctx)
-       instance := request.Instance
 
        // 允许自定义 id
-       if len(instance.InstanceId) > 0 {
-               resp, err := ds.Heartbeat(ctx, &discovery.HeartbeatRequest{
+       if isUserDefinedID {
+               resp, err := datasource.Instance().Heartbeat(ctx, 
&discovery.HeartbeatRequest{
                        InstanceId: instance.InstanceId,
                        ServiceId:  instance.ServiceId,
                })
@@ -1419,7 +1486,7 @@ func (ds *DataSource) RegisterInstance(ctx 
context.Context, request *discovery.R
                                instance.ServiceId, instance.Endpoints, 
instance.HostName, remoteIP), err)
                        return &discovery.RegisterInstanceResponse{
                                Response: 
discovery.CreateResponse(discovery.ErrInternal, err.Error()),
-                       }, nil
+                       }, false, nil
                }
                switch resp.Response.GetCode() {
                case discovery.ResponseSuccess:
@@ -1428,24 +1495,17 @@ func (ds *DataSource) RegisterInstance(ctx 
context.Context, request *discovery.R
                        return &discovery.RegisterInstanceResponse{
                                Response:   resp.Response,
                                InstanceId: instance.InstanceId,
-                       }, nil
+                       }, false, nil
                case discovery.ErrInstanceNotExists:
-                       // register a new one
-                       if request.Instance.HealthCheck == nil {
-                               request.Instance.HealthCheck = 
&discovery.HealthCheck{
-                                       Mode:     discovery.CHECK_BY_HEARTBEAT,
-                                       Interval: 
apt.RegistryDefaultLeaseRenewalinterval,
-                                       Times:    
apt.RegistryDefaultLeaseRetrytimes,
-                               }
-                       }
-                       return registryInstance(ctx, request)
+                       //register a new one
                default:
                        log.Error(fmt.Sprintf("register instance failed, reuse 
instance %s %s, operator %s",
                                instance.ServiceId, instance.InstanceId, 
remoteIP), err)
                        return &discovery.RegisterInstanceResponse{
                                Response: resp.Response,
-                       }, err
+                       }, false, err
                }
+
        }
 
        if err := preProcessRegisterInstance(ctx, instance); err != nil {
@@ -1453,9 +1513,13 @@ func (ds *DataSource) RegisterInstance(ctx 
context.Context, request *discovery.R
                        instance.ServiceId, instance.Endpoints, 
instance.HostName, remoteIP), err)
                return &discovery.RegisterInstanceResponse{
                        Response: discovery.CreateResponseWithSCErr(err),
-               }, nil
+               }, false, nil
        }
-       return registryInstance(ctx, request)
+
+       return &discovery.RegisterInstanceResponse{
+               Response:   discovery.CreateResponse(discovery.ResponseSuccess, 
"process success"),
+               InstanceId: instance.InstanceId,
+       }, true, nil
 }
 
 // GetInstance returns instance under the current domain
@@ -1992,6 +2056,24 @@ func registryInstance(ctx context.Context, request 
*discovery.RegisterInstanceRe
        }, nil
 }
 
+func registryInstances(ctx context.Context, instances []interface{}) 
(*discovery.RegisterInstanceResponse, error) {
+       opts := options.InsertManyOptions{}
+       opts.SetOrdered(false)
+       opts.SetBypassDocumentValidation(true)
+       _, err := client.GetMongoClient().BatchInsert(ctx, 
model.CollectionInstance, instances, &opts)
+
+       if err != nil {
+               log.Error("Batch register instance failed", err)
+               return &discovery.RegisterInstanceResponse{
+                       Response: 
discovery.CreateResponse(discovery.ErrUnavailableBackend, err.Error()),
+               }, err
+       }
+
+       return &discovery.RegisterInstanceResponse{
+               Response: discovery.CreateResponse(discovery.ResponseSuccess, 
"Register service instance successfully."),
+       }, nil
+}
+
 func (ds *DataSource) findSharedServiceInstance(ctx context.Context, request 
*discovery.FindInstancesRequest, provider *discovery.MicroServiceKey, rev 
string) (*discovery.FindInstancesResponse, error) {
        var err error
        // it means the shared micro-services must be the same env with SC.
@@ -2367,12 +2449,23 @@ func preProcessRegisterInstance(ctx context.Context, 
instance *discovery.MicroSe
                        instance.HealthCheck.Times = retryTimes
                }
        }
-       filter := mutil.NewBasicFilter(ctx, 
mutil.ServiceServiceID(instance.ServiceId))
-       microservice, err := dao.GetService(ctx, filter)
-       if err != nil {
-               return discovery.NewError(discovery.ErrServiceNotExists, 
"invalid 'serviceID' in request body.")
+
+       cacheService, ok := cache.GetServiceByID(instance.ServiceId)
+
+       var microService *discovery.MicroService
+       if ok {
+               microService = cacheService.Service
+               instance.Version = microService.Version
+       } else {
+               filter := mutil.NewBasicFilter(ctx, 
mutil.ServiceServiceID(instance.ServiceId))
+               microservice, err := dao.GetService(ctx, filter)
+               if err != nil {
+                       log.Error("Get service failed", err)
+                       return 
discovery.NewError(discovery.ErrServiceNotExists, "invalid 'serviceID' in 
request body.")
+               }
+               instance.Version = microservice.Service.Version
        }
-       instance.Version = microservice.Service.Version
+
        return nil
 }
 
diff --git a/datasource/mongo/types.go b/datasource/mongo/types.go
new file mode 100644
index 0000000..ea44328
--- /dev/null
+++ b/datasource/mongo/types.go
@@ -0,0 +1,13 @@
+package mongo
+
+import (
+       "context"
+       "github.com/go-chassis/cari/discovery"
+)
+
+type InstanceRegisterEvent struct {
+       Ctx        context.Context
+       Request    *discovery.RegisterInstanceRequest
+       isCustomID bool
+       failedTime int
+}
diff --git a/docs/user-guides.rst b/docs/user-guides.rst
index bdfd5a6..4376d3b 100644
--- a/docs/user-guides.rst
+++ b/docs/user-guides.rst
@@ -11,3 +11,4 @@ User Guides
    user-guides/sc-cluster.rst
    user-guides/integration-grafana.rst
    user-guides/rbac.md
+   user-guides/fast-registration.md
diff --git a/docs/user-guides/fast-registration.md 
b/docs/user-guides/fast-registration.md
new file mode 100644
index 0000000..05cd87c
--- /dev/null
+++ b/docs/user-guides/fast-registration.md
@@ -0,0 +1,76 @@
+# Fast Registration
+
+Fast registration feature, can support millions of instance registration.
+
+This feature is primarily used in scenarios where an ultra-high performance 
registry is required and is not recommended for scenarios where performance 
requirements are low or instance levels are small.
+
+This feature is turn off by default, if need fast register you should open the 
fast registration switch.
+
+When this feature is open, you can call register interface API, the service 
center will put instances in the queue, then direct return instanceId to users, 
at last, register registry asynchronously by timing task.
+
+## QuickStart Guide
+1.Config the fast registration queue size, to enable fast registration
+
+If queueSize is bigger than 0, fast registration will trigger
+
+The default configuration of /conf/app.yaml is as follows:
+```
+register:
+  fastRegistration:
+    # this config is only support in mongo case now
+    # if fastRegister.queueSize is > 0, enable to fast register instance, else 
register instance in normal case
+    # if fastRegister is enabled, instance will be registered asynchronously,
+    # just put instance in the queue and return instanceID, and then register 
through the timing task
+    queueSize: 0
+```
+
+Config queueSize in /conf/app.yaml, for example set queueSize to 50w
+```
+register.fastRegistration.queueSize=500000
+```
+
+2.Start service center
+```
+./service-center
+```
+
+3.Call the registry interface
+
+Call the registry interface, you will receive InstanceID soon, now a fast 
registration has been completed once
+
+* Registered instance APIs can be called concurrently
+* There is a slight delay between returning the instanceID and actually 
registering the instance to database, but 100w instance registration delays are 
within seconds
+* If more than 15 minutes did not discovery the instance ,there may be a 
problem with the environment. The client can register again with the InstanceID 
that has been generated and return to user
+
+## Process Design
+The flow chart is as follows:
+
+
+![register_image](fast_register_design.png)
+
+Normal Case:
+
+If the fast registration is enabled, it is put in the queue and eventually 
registered to MongoDB in batch by timed tasks(The time interval is 100 
millimeters)
+
+
+Abnormal Case:
+
+1. If the connection between Mongo and service center is broken, and the 
registration fails, the instance will be put into the failure queue and 
registered again
+2. If the registration fails for 3 consecutive times, the fuse will be cut off 
for 5s and resume after successful registration
+3. If a single instance fails to register for more than 500 times, the 
instance will be discarded, and the SDK will register again when the heartbeat 
finds that the instance does not exist
+
+## Attention
+1.The database with ETCD scenario does not have this feature; only the Mongo 
database scenario does
+
+2.Because the registration is asynchronous, there will be a certain amount of 
delay in the registration, the delay is basically in the second level
+
+## Performance Test
+The performance of a fast registration instance is about three times better 
than that of a normal registration
+
+best performance test:
+
+|service center| mongoDB | concurrency|tps |latency|queueSize|
+|----| ----| ----|----|----|----|
+|8u16g*2|16u32g|200|9w |1mm|100w|
+|16u32g*2|16u32g|500|15w|2mm|100w|
+
diff --git a/docs/user-guides/fast_register_design.png 
b/docs/user-guides/fast_register_design.png
new file mode 100644
index 0000000..ed14c55
Binary files /dev/null and b/docs/user-guides/fast_register_design.png differ
diff --git a/etc/conf/app.yaml b/etc/conf/app.yaml
index 1d04cc5..dc05732 100644
--- a/etc/conf/app.yaml
+++ b/etc/conf/app.yaml
@@ -132,6 +132,12 @@ registry:
       verifyPeer: false
       certFile: /opt/ssl/client.crt
       keyFile: /opt/ssl/client.key
+  fastRegistration:
+    # this config is only support in mongo case now
+    # if fastRegister.queueSize is > 0, enable to fast register instance, else 
register instance in normal case
+    # if fastRegister is enabled, instance will be registered asynchronously,
+    # just put instance in the queue and return instanceID, and then register 
through the timing task
+    queueSize: 0
 
   service:
     # enable the job clear the microservices which deploy no instance

Reply via email to