This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 7abd499  dumpcache接口实现以及syncer代码优化 (#800)
7abd499 is described below

commit 7abd4990384fbda82dde78f10a12457a7b110cd3
Author: lilai23 <[email protected]>
AuthorDate: Wed Dec 30 14:10:49 2020 +0800

    dumpcache接口实现以及syncer代码优化 (#800)
    
    1.实现mongo的dumpcache接口
        2.syncer逻辑优化
        3.existence接口请求bug修复
        4.重构dumpcache接口
---
 client/microservice.go                |  2 +-
 datasource/{system.go => common.go}   | 15 ++-----
 datasource/etcd/system.go             |  4 +-
 datasource/etcd/system_test.go        |  4 +-
 datasource/mongo/system.go            | 34 +++++++++++++++-
 datasource/mongo/system_test.go       | 77 +++++++++++++++++++++++++++++++++++
 datasource/system.go                  |  2 +-
 server/rest/admin/service.go          |  4 +-
 syncer/grpc/grpc.go                   | 11 +++--
 syncer/server/handler.go              |  5 ++-
 syncer/servicecenter/servicecenter.go | 22 ++++++----
 syncer/servicecenter/sync.go          | 21 ++++++----
 12 files changed, 158 insertions(+), 43 deletions(-)

diff --git a/client/microservice.go b/client/microservice.go
index ae81678..a87bad0 100644
--- a/client/microservice.go
+++ b/client/microservice.go
@@ -94,7 +94,7 @@ func (c *Client) ServiceExistence(ctx context.Context, 
domain, project string, a
        query := url.Values{}
        query.Set("type", "microservice")
        query.Set("env", env)
-       query.Set("appID", appID)
+       query.Set("appId", appID)
        query.Set("serviceName", serviceName)
        query.Set("version", versionRule)
 
diff --git a/datasource/system.go b/datasource/common.go
similarity index 70%
copy from datasource/system.go
copy to datasource/common.go
index 67771bf..b732eb8 100644
--- a/datasource/system.go
+++ b/datasource/common.go
@@ -17,15 +17,8 @@
 
 package datasource
 
-import (
-       "context"
-
-       "github.com/apache/servicecomb-service-center/pkg/dump"
+const (
+       ServiceKeyPrefix  = "/cse-sr/ms/files"
+       InstanceKeyPrefix = "/cse-sr/inst/files"
+       SPLIT             = "/"
 )
-
-// SystemManager contains the APIs of system management
-type SystemManager interface {
-       DumpCache(ctx context.Context, cache *dump.Cache)
-       DLock(ctx context.Context, request *DLockRequest) error
-       DUnlock(ctx context.Context, request *DUnlockRequest) error
-}
diff --git a/datasource/etcd/system.go b/datasource/etcd/system.go
index 6c2b4b5..b7fdf64 100644
--- a/datasource/etcd/system.go
+++ b/datasource/etcd/system.go
@@ -29,7 +29,8 @@ import (
        "github.com/apache/servicecomb-service-center/pkg/gopool"
 )
 
-func (ds *DataSource) DumpCache(ctx context.Context, cache *dump.Cache) {
+func (ds *DataSource) DumpCache(ctx context.Context) *dump.Cache {
+       var cache dump.Cache
        gopool.New(ctx, gopool.Configure().Workers(2)).
                Do(func(_ context.Context) { setValue(kv.Store().Service(), 
&cache.Microservices) }).
                Do(func(_ context.Context) { 
setValue(kv.Store().ServiceIndex(), &cache.Indexes) }).
@@ -41,6 +42,7 @@ func (ds *DataSource) DumpCache(ctx context.Context, cache 
*dump.Cache) {
                Do(func(_ context.Context) { 
setValue(kv.Store().SchemaSummary(), &cache.Summaries) }).
                Do(func(_ context.Context) { setValue(kv.Store().Instance(), 
&cache.Instances) }).
                Done()
+       return &cache
 }
 
 func setValue(e sd.Adaptor, setter dump.Setter) {
diff --git a/datasource/etcd/system_test.go b/datasource/etcd/system_test.go
index ae624d7..09e11ba 100644
--- a/datasource/etcd/system_test.go
+++ b/datasource/etcd/system_test.go
@@ -20,13 +20,11 @@ import (
        "testing"
 
        "github.com/apache/servicecomb-service-center/datasource"
-       "github.com/apache/servicecomb-service-center/pkg/dump"
        "github.com/stretchr/testify/assert"
 )
 
 func TestAdminService_Dump(t *testing.T) {
        t.Log("execute 'dump' operation,when get all,should be passed")
-       var cache dump.Cache
-       datasource.Instance().DumpCache(getContext(), &cache)
+       cache := datasource.Instance().DumpCache(getContext())
        assert.Equal(t, len(cache.Indexes), len(cache.Microservices))
 }
diff --git a/datasource/mongo/system.go b/datasource/mongo/system.go
index f3d8ea8..3356920 100644
--- a/datasource/mongo/system.go
+++ b/datasource/mongo/system.go
@@ -21,11 +21,19 @@ import (
        "context"
 
        "github.com/apache/servicecomb-service-center/datasource"
+       "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
        "github.com/apache/servicecomb-service-center/pkg/dump"
+       "github.com/apache/servicecomb-service-center/pkg/gopool"
+       "github.com/apache/servicecomb-service-center/pkg/util"
 )
 
-func (ds *DataSource) DumpCache(ctx context.Context, cache *dump.Cache) {
-
+func (ds *DataSource) DumpCache(ctx context.Context) *dump.Cache {
+       var cache dump.Cache
+       gopool.New(ctx, gopool.Configure().Workers(2)).
+               Do(func(_ context.Context) { 
setServiceValue(sd.Store().Service(), &cache.Microservices) }).
+               Do(func(_ context.Context) { 
setInstanceValue(sd.Store().Instance(), &cache.Instances) }).
+               Done()
+       return &cache
 }
 
 func (ds *DataSource) DLock(ctx context.Context, request 
*datasource.DLockRequest) error {
@@ -35,3 +43,25 @@ func (ds *DataSource) DLock(ctx context.Context, request 
*datasource.DLockReques
 func (ds *DataSource) DUnlock(ctx context.Context, request 
*datasource.DUnlockRequest) error {
        return nil
 }
+
+func setServiceValue(e *sd.MongoCacher, setter dump.Setter) {
+       e.Cache().ForEach(func(k string, kv interface{}) (next bool) {
+               setter.SetValue(&dump.KV{
+                       Key: 
util.StringJoin([]string{datasource.ServiceKeyPrefix, kv.(sd.Service).Domain,
+                               kv.(sd.Service).Project, k}, datasource.SPLIT),
+                       Value: kv.(sd.Service).ServiceInfo,
+               })
+               return true
+       })
+}
+
+func setInstanceValue(e *sd.MongoCacher, setter dump.Setter) {
+       e.Cache().ForEach(func(k string, kv interface{}) (next bool) {
+               setter.SetValue(&dump.KV{
+                       Key: 
util.StringJoin([]string{datasource.InstanceKeyPrefix, kv.(sd.Instance).Domain,
+                               kv.(sd.Instance).Project, 
kv.(sd.Instance).InstanceInfo.ServiceId, k}, datasource.SPLIT),
+                       Value: kv.(sd.Instance).InstanceInfo,
+               })
+               return true
+       })
+}
diff --git a/datasource/mongo/system_test.go b/datasource/mongo/system_test.go
new file mode 100644
index 0000000..ea377ec
--- /dev/null
+++ b/datasource/mongo/system_test.go
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mongo_test
+
+import (
+       "testing"
+
+       "github.com/apache/servicecomb-service-center/datasource"
+       "github.com/apache/servicecomb-service-center/datasource/mongo"
+       "github.com/apache/servicecomb-service-center/datasource/mongo/client"
+       "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
+       pb "github.com/go-chassis/cari/discovery"
+       "github.com/go-chassis/go-chassis/v2/storage"
+       "github.com/stretchr/testify/assert"
+       "go.mongodb.org/mongo-driver/bson"
+)
+
+func init() {
+       config := storage.Options{
+               URI: "mongodb://localhost:27017",
+       }
+       client.NewMongoClient(config)
+       // clean the mongodb
+       client.GetMongoClient().Delete(getContext(), mongo.CollectionInstance, 
bson.M{})
+       client.GetMongoClient().Delete(getContext(), mongo.CollectionService, 
bson.M{})
+}
+
+func TestDumpCache(t *testing.T) {
+       var store = &sd.TypeStore{}
+       store.Initialize()
+       t.Run("Register service && instance, check dump, should pass", func(t 
*testing.T) {
+               var serviceID string
+               service, err := 
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+                       Service: &pb.MicroService{
+                               ServiceName: "create_service_test",
+                               AppId:       "create_service_appId",
+                               Version:     "1.0.0",
+                               Level:       "BACK",
+                               Status:      pb.MS_UP,
+                       },
+               })
+               assert.NoError(t, err)
+               assert.Equal(t, pb.ResponseSuccess, service.Response.GetCode())
+               serviceID = service.ServiceId
+
+               instance, err := 
datasource.Instance().RegisterInstance(getContext(), 
&pb.RegisterInstanceRequest{
+                       Instance: &pb.MicroServiceInstance{
+                               ServiceId: serviceID,
+                               Endpoints: []string{
+                                       "localhost://127.0.0.1:8080",
+                               },
+                               HostName: "HOST_TEST",
+                               Status:   pb.MSI_UP,
+                       },
+               })
+               assert.NoError(t, err)
+               assert.Equal(t, pb.ResponseSuccess, instance.Response.GetCode())
+
+               cache := datasource.Instance().DumpCache(getContext())
+               assert.NotNil(t, cache)
+       })
+}
diff --git a/datasource/system.go b/datasource/system.go
index 67771bf..153b1c7 100644
--- a/datasource/system.go
+++ b/datasource/system.go
@@ -25,7 +25,7 @@ import (
 
 // SystemManager contains the APIs of system management
 type SystemManager interface {
-       DumpCache(ctx context.Context, cache *dump.Cache)
+       DumpCache(ctx context.Context) *dump.Cache
        DLock(ctx context.Context, request *DLockRequest) error
        DUnlock(ctx context.Context, request *DUnlockRequest) error
 }
diff --git a/server/rest/admin/service.go b/server/rest/admin/service.go
index a5d9f35..299d04d 100644
--- a/server/rest/admin/service.go
+++ b/server/rest/admin/service.go
@@ -77,9 +77,7 @@ func (service *Service) dump(ctx context.Context, option 
string, resp *dump.Resp
        case "config":
                resp.AppConfig = archaius.GetConfigs()
        case "cache":
-               var cache dump.Cache
-               datasource.Instance().DumpCache(ctx, &cache)
-               resp.Cache = &cache
+               resp.Cache = datasource.Instance().DumpCache(ctx)
        case "all":
                service.dump(ctx, "info", resp)
                service.dump(ctx, "config", resp)
diff --git a/syncer/grpc/grpc.go b/syncer/grpc/grpc.go
index 6162ee6..b72b12d 100644
--- a/syncer/grpc/grpc.go
+++ b/syncer/grpc/grpc.go
@@ -19,6 +19,7 @@ package grpc
 
 import (
        "context"
+       "math"
        "net"
 
        "github.com/apache/servicecomb-service-center/pkg/log"
@@ -44,9 +45,9 @@ func NewServer(ops ...Option) (*Server, error) {
        conf := toGRPCConfig(ops...)
        var srv *grpc.Server
        if conf.tlsConfig != nil {
-               srv = 
grpc.NewServer(grpc.Creds(credentials.NewTLS(conf.tlsConfig)))
+               srv = 
grpc.NewServer(grpc.Creds(credentials.NewTLS(conf.tlsConfig)), 
grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32))
        } else {
-               srv = grpc.NewServer()
+               srv = grpc.NewServer(grpc.MaxRecvMsgSize(math.MaxInt32), 
grpc.MaxSendMsgSize(math.MaxInt32))
        }
 
        rpc.RegisterGRpcServer(srv)
@@ -120,9 +121,11 @@ func InjectClient(injection func(conn *grpc.ClientConn), 
ops ...Option) error {
        var err error
 
        if conf.tlsConfig != nil {
-               conn, err = grpc.Dial(conf.addr, 
grpc.WithTransportCredentials(credentials.NewTLS(conf.tlsConfig)))
+               conn, err = grpc.Dial(conf.addr, 
grpc.WithTransportCredentials(credentials.NewTLS(conf.tlsConfig)),
+                       
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32), 
grpc.MaxCallSendMsgSize(math.MaxInt32)))
        } else {
-               conn, err = grpc.Dial(conf.addr, grpc.WithInsecure())
+               conn, err = grpc.Dial(conf.addr, grpc.WithInsecure(),
+                       
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32), 
grpc.MaxCallSendMsgSize(math.MaxInt32)))
        }
 
        if err != nil {
diff --git a/syncer/server/handler.go b/syncer/server/handler.go
index 0716db4..fae5434 100644
--- a/syncer/server/handler.go
+++ b/syncer/server/handler.go
@@ -46,8 +46,6 @@ func (s *Server) tickHandler() {
                return
        }
        log.Debugf("Handle Tick")
-       // Flush data to the storage of servicecenter
-       s.servicecenter.FlushData()
 
        // sends a UserEvent on Serf, the event will be broadcast between 
members
        s.mux.Lock()
@@ -200,6 +198,9 @@ func (s *Server) incrementUserEvent(data ...[]byte) 
(success bool) {
 }
 
 func (s *Server) notifyUserEvent(data ...[]byte) (success bool) {
+       // Flush data to the storage of servicecenter
+       s.servicecenter.FlushData()
+
        err := s.serf.UserEvent(EventDiscovered, 
util.StringToBytesWithNoCopy(s.conf.Cluster))
        if err != nil {
                log.Error("Syncer send discovered user event failed", err)
diff --git a/syncer/servicecenter/servicecenter.go 
b/syncer/servicecenter/servicecenter.go
index 1c4bd66..25bf651 100644
--- a/syncer/servicecenter/servicecenter.go
+++ b/syncer/servicecenter/servicecenter.go
@@ -84,16 +84,19 @@ func (s *servicecenter) Registry(clusterName string, data 
*pb.SyncData) {
                svc := searchService(inst, data.Services)
                if svc == nil {
                        err := errors.New("service does not exist")
-                       log.Errorf(err, "servicecenter.Registry, serviceID = 
%s, instanceId = %s", inst.ServiceId, inst.InstanceId)
+                       log.Error(fmt.Sprintf("servicecenter.Registry, 
serviceID = %s, instanceId = %s", inst.ServiceId, inst.InstanceId), err)
                        continue
                }
 
                // If the svc is in the mapping, just do nothing, if not, 
created it in servicecenter and get the new serviceID
-               svcID := s.createService(svc)
-               log.Debugf("create service success orgServiceID= %s, 
curServiceID = %s", inst.ServiceId, svcID)
+               svcID, err := s.createService(svc)
+               if err != nil {
+                       log.Error("create service failed", err)
+                       continue
+               }
 
                // If inst is in the mapping, just heart beat it in 
servicecenter
-               log.Debugf("trying to do registration of instance, instanceID = 
%s", inst.InstanceId)
+               log.Debug(fmt.Sprintf("trying to do registration of instance, 
instanceID = %s", inst.InstanceId))
                if s.heartbeatInstances(mapping, inst) {
                        continue
                }
@@ -136,10 +139,6 @@ func (s *servicecenter) IncrementRegistry(clusterName 
string, data *pb.SyncData)
                        continue
                }
 
-               // If the svc is in the mapping, just do nothing, if not, 
created it in servicecenter and get the new serviceID
-               svcID := s.createService(svc)
-               log.Debug(fmt.Sprintf("create service success orgServiceID= %s, 
curServiceID = %s", inst.ServiceId, svcID))
-
                matches := pb.Expansions(inst.Expansions).Find("action", 
map[string]string{})
                if len(matches) != 1 {
                        err := utils.ErrActionInvalid
@@ -149,6 +148,13 @@ func (s *servicecenter) IncrementRegistry(clusterName 
string, data *pb.SyncData)
                action := string(matches[0].Bytes[:])
 
                if action == string(discovery.EVT_CREATE) {
+                       // If the svc is in the mapping, just do nothing, if 
not, created it in servicecenter and get the new serviceID
+                       svcID, err := s.createService(svc)
+                       if err != nil {
+                               log.Error("create service failed", err)
+                               continue
+                       }
+
                        log.Debug(fmt.Sprintf("trying to do registration of 
instance, instanceID = %s", inst.InstanceId))
 
                        // If inst is in the mapping, just heart beat it in 
servicecenter
diff --git a/syncer/servicecenter/sync.go b/syncer/servicecenter/sync.go
index f7dcb0e..9eed8a9 100644
--- a/syncer/servicecenter/sync.go
+++ b/syncer/servicecenter/sync.go
@@ -19,6 +19,7 @@ package servicecenter
 
 import (
        "context"
+       "fmt"
 
        "github.com/apache/servicecomb-service-center/pkg/log"
        pb "github.com/apache/servicecomb-service-center/syncer/proto"
@@ -40,19 +41,25 @@ func (s *servicecenter) heartbeatInstances(mapping 
pb.SyncMapping, instance *pb.
        return true
 }
 
-func (s *servicecenter) createService(service *pb.SyncService) string {
+func (s *servicecenter) createService(service *pb.SyncService) (string, error) 
{
        ctx := context.Background()
-       serviceID, _ := s.servicecenter.ServiceExistence(ctx, 
service.DomainProject, service)
+       serviceID, err := s.servicecenter.ServiceExistence(ctx, 
service.DomainProject, service)
+       if err != nil {
+               log.Error("get service existence failed", err)
+               return "", err
+       }
+
        if serviceID == "" {
-               var err error
                serviceID, err = s.servicecenter.CreateService(ctx, 
service.DomainProject, service)
                if err != nil {
-                       log.Errorf(err, "Servicecenter create service failed")
-                       return ""
+                       log.Error("create service failed", err)
+                       return "", err
                }
-               log.Debugf("Create service successful, serviceID = %s", 
serviceID)
+               log.Debug(fmt.Sprintf("create service successful, serviceID = 
%s", serviceID))
+       } else {
+               log.Debug(fmt.Sprintf("service already exists, serviceID = %s", 
serviceID))
        }
-       return serviceID
+       return serviceID, nil
 }
 
 func (s *servicecenter) registryInstances(domainProject, serviceID string, 
instance *pb.SyncInstance) string {

Reply via email to