This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 7abd499 dumpcache接口实现以及syncer代码优化 (#800)
7abd499 is described below
commit 7abd4990384fbda82dde78f10a12457a7b110cd3
Author: lilai23 <[email protected]>
AuthorDate: Wed Dec 30 14:10:49 2020 +0800
dumpcache接口实现以及syncer代码优化 (#800)
1.实现mongo的dumpcache接口
2.syncer逻辑优化
3.existence接口请求bug修复
4.重构dumpcache接口
---
client/microservice.go | 2 +-
datasource/{system.go => common.go} | 15 ++-----
datasource/etcd/system.go | 4 +-
datasource/etcd/system_test.go | 4 +-
datasource/mongo/system.go | 34 +++++++++++++++-
datasource/mongo/system_test.go | 77 +++++++++++++++++++++++++++++++++++
datasource/system.go | 2 +-
server/rest/admin/service.go | 4 +-
syncer/grpc/grpc.go | 11 +++--
syncer/server/handler.go | 5 ++-
syncer/servicecenter/servicecenter.go | 22 ++++++----
syncer/servicecenter/sync.go | 21 ++++++----
12 files changed, 158 insertions(+), 43 deletions(-)
diff --git a/client/microservice.go b/client/microservice.go
index ae81678..a87bad0 100644
--- a/client/microservice.go
+++ b/client/microservice.go
@@ -94,7 +94,7 @@ func (c *Client) ServiceExistence(ctx context.Context,
domain, project string, a
query := url.Values{}
query.Set("type", "microservice")
query.Set("env", env)
- query.Set("appID", appID)
+ query.Set("appId", appID)
query.Set("serviceName", serviceName)
query.Set("version", versionRule)
diff --git a/datasource/system.go b/datasource/common.go
similarity index 70%
copy from datasource/system.go
copy to datasource/common.go
index 67771bf..b732eb8 100644
--- a/datasource/system.go
+++ b/datasource/common.go
@@ -17,15 +17,8 @@
package datasource
-import (
- "context"
-
- "github.com/apache/servicecomb-service-center/pkg/dump"
+const (
+ ServiceKeyPrefix = "/cse-sr/ms/files"
+ InstanceKeyPrefix = "/cse-sr/inst/files"
+ SPLIT = "/"
)
-
-// SystemManager contains the APIs of system management
-type SystemManager interface {
- DumpCache(ctx context.Context, cache *dump.Cache)
- DLock(ctx context.Context, request *DLockRequest) error
- DUnlock(ctx context.Context, request *DUnlockRequest) error
-}
diff --git a/datasource/etcd/system.go b/datasource/etcd/system.go
index 6c2b4b5..b7fdf64 100644
--- a/datasource/etcd/system.go
+++ b/datasource/etcd/system.go
@@ -29,7 +29,8 @@ import (
"github.com/apache/servicecomb-service-center/pkg/gopool"
)
-func (ds *DataSource) DumpCache(ctx context.Context, cache *dump.Cache) {
+func (ds *DataSource) DumpCache(ctx context.Context) *dump.Cache {
+ var cache dump.Cache
gopool.New(ctx, gopool.Configure().Workers(2)).
Do(func(_ context.Context) { setValue(kv.Store().Service(),
&cache.Microservices) }).
Do(func(_ context.Context) {
setValue(kv.Store().ServiceIndex(), &cache.Indexes) }).
@@ -41,6 +42,7 @@ func (ds *DataSource) DumpCache(ctx context.Context, cache
*dump.Cache) {
Do(func(_ context.Context) {
setValue(kv.Store().SchemaSummary(), &cache.Summaries) }).
Do(func(_ context.Context) { setValue(kv.Store().Instance(),
&cache.Instances) }).
Done()
+ return &cache
}
func setValue(e sd.Adaptor, setter dump.Setter) {
diff --git a/datasource/etcd/system_test.go b/datasource/etcd/system_test.go
index ae624d7..09e11ba 100644
--- a/datasource/etcd/system_test.go
+++ b/datasource/etcd/system_test.go
@@ -20,13 +20,11 @@ import (
"testing"
"github.com/apache/servicecomb-service-center/datasource"
- "github.com/apache/servicecomb-service-center/pkg/dump"
"github.com/stretchr/testify/assert"
)
func TestAdminService_Dump(t *testing.T) {
t.Log("execute 'dump' operation,when get all,should be passed")
- var cache dump.Cache
- datasource.Instance().DumpCache(getContext(), &cache)
+ cache := datasource.Instance().DumpCache(getContext())
assert.Equal(t, len(cache.Indexes), len(cache.Microservices))
}
diff --git a/datasource/mongo/system.go b/datasource/mongo/system.go
index f3d8ea8..3356920 100644
--- a/datasource/mongo/system.go
+++ b/datasource/mongo/system.go
@@ -21,11 +21,19 @@ import (
"context"
"github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
"github.com/apache/servicecomb-service-center/pkg/dump"
+ "github.com/apache/servicecomb-service-center/pkg/gopool"
+ "github.com/apache/servicecomb-service-center/pkg/util"
)
-func (ds *DataSource) DumpCache(ctx context.Context, cache *dump.Cache) {
-
+func (ds *DataSource) DumpCache(ctx context.Context) *dump.Cache {
+ var cache dump.Cache
+ gopool.New(ctx, gopool.Configure().Workers(2)).
+ Do(func(_ context.Context) {
setServiceValue(sd.Store().Service(), &cache.Microservices) }).
+ Do(func(_ context.Context) {
setInstanceValue(sd.Store().Instance(), &cache.Instances) }).
+ Done()
+ return &cache
}
func (ds *DataSource) DLock(ctx context.Context, request
*datasource.DLockRequest) error {
@@ -35,3 +43,25 @@ func (ds *DataSource) DLock(ctx context.Context, request
*datasource.DLockReques
func (ds *DataSource) DUnlock(ctx context.Context, request
*datasource.DUnlockRequest) error {
return nil
}
+
+func setServiceValue(e *sd.MongoCacher, setter dump.Setter) {
+ e.Cache().ForEach(func(k string, kv interface{}) (next bool) {
+ setter.SetValue(&dump.KV{
+ Key:
util.StringJoin([]string{datasource.ServiceKeyPrefix, kv.(sd.Service).Domain,
+ kv.(sd.Service).Project, k}, datasource.SPLIT),
+ Value: kv.(sd.Service).ServiceInfo,
+ })
+ return true
+ })
+}
+
+func setInstanceValue(e *sd.MongoCacher, setter dump.Setter) {
+ e.Cache().ForEach(func(k string, kv interface{}) (next bool) {
+ setter.SetValue(&dump.KV{
+ Key:
util.StringJoin([]string{datasource.InstanceKeyPrefix, kv.(sd.Instance).Domain,
+ kv.(sd.Instance).Project,
kv.(sd.Instance).InstanceInfo.ServiceId, k}, datasource.SPLIT),
+ Value: kv.(sd.Instance).InstanceInfo,
+ })
+ return true
+ })
+}
diff --git a/datasource/mongo/system_test.go b/datasource/mongo/system_test.go
new file mode 100644
index 0000000..ea377ec
--- /dev/null
+++ b/datasource/mongo/system_test.go
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mongo_test
+
+import (
+ "testing"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/mongo"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/client"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/sd"
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/go-chassis/go-chassis/v2/storage"
+ "github.com/stretchr/testify/assert"
+ "go.mongodb.org/mongo-driver/bson"
+)
+
+func init() {
+ config := storage.Options{
+ URI: "mongodb://localhost:27017",
+ }
+ client.NewMongoClient(config)
+ // clean the mongodb
+ client.GetMongoClient().Delete(getContext(), mongo.CollectionInstance,
bson.M{})
+ client.GetMongoClient().Delete(getContext(), mongo.CollectionService,
bson.M{})
+}
+
+func TestDumpCache(t *testing.T) {
+ var store = &sd.TypeStore{}
+ store.Initialize()
+ t.Run("Register service && instance, check dump, should pass", func(t
*testing.T) {
+ var serviceID string
+ service, err :=
datasource.Instance().RegisterService(getContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceName: "create_service_test",
+ AppId: "create_service_appId",
+ Version: "1.0.0",
+ Level: "BACK",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, service.Response.GetCode())
+ serviceID = service.ServiceId
+
+ instance, err :=
datasource.Instance().RegisterInstance(getContext(),
&pb.RegisterInstanceRequest{
+ Instance: &pb.MicroServiceInstance{
+ ServiceId: serviceID,
+ Endpoints: []string{
+ "localhost://127.0.0.1:8080",
+ },
+ HostName: "HOST_TEST",
+ Status: pb.MSI_UP,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, instance.Response.GetCode())
+
+ cache := datasource.Instance().DumpCache(getContext())
+ assert.NotNil(t, cache)
+ })
+}
diff --git a/datasource/system.go b/datasource/system.go
index 67771bf..153b1c7 100644
--- a/datasource/system.go
+++ b/datasource/system.go
@@ -25,7 +25,7 @@ import (
// SystemManager contains the APIs of system management
type SystemManager interface {
- DumpCache(ctx context.Context, cache *dump.Cache)
+ DumpCache(ctx context.Context) *dump.Cache
DLock(ctx context.Context, request *DLockRequest) error
DUnlock(ctx context.Context, request *DUnlockRequest) error
}
diff --git a/server/rest/admin/service.go b/server/rest/admin/service.go
index a5d9f35..299d04d 100644
--- a/server/rest/admin/service.go
+++ b/server/rest/admin/service.go
@@ -77,9 +77,7 @@ func (service *Service) dump(ctx context.Context, option
string, resp *dump.Resp
case "config":
resp.AppConfig = archaius.GetConfigs()
case "cache":
- var cache dump.Cache
- datasource.Instance().DumpCache(ctx, &cache)
- resp.Cache = &cache
+ resp.Cache = datasource.Instance().DumpCache(ctx)
case "all":
service.dump(ctx, "info", resp)
service.dump(ctx, "config", resp)
diff --git a/syncer/grpc/grpc.go b/syncer/grpc/grpc.go
index 6162ee6..b72b12d 100644
--- a/syncer/grpc/grpc.go
+++ b/syncer/grpc/grpc.go
@@ -19,6 +19,7 @@ package grpc
import (
"context"
+ "math"
"net"
"github.com/apache/servicecomb-service-center/pkg/log"
@@ -44,9 +45,9 @@ func NewServer(ops ...Option) (*Server, error) {
conf := toGRPCConfig(ops...)
var srv *grpc.Server
if conf.tlsConfig != nil {
- srv =
grpc.NewServer(grpc.Creds(credentials.NewTLS(conf.tlsConfig)))
+ srv =
grpc.NewServer(grpc.Creds(credentials.NewTLS(conf.tlsConfig)),
grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32))
} else {
- srv = grpc.NewServer()
+ srv = grpc.NewServer(grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32))
}
rpc.RegisterGRpcServer(srv)
@@ -120,9 +121,11 @@ func InjectClient(injection func(conn *grpc.ClientConn),
ops ...Option) error {
var err error
if conf.tlsConfig != nil {
- conn, err = grpc.Dial(conf.addr,
grpc.WithTransportCredentials(credentials.NewTLS(conf.tlsConfig)))
+ conn, err = grpc.Dial(conf.addr,
grpc.WithTransportCredentials(credentials.NewTLS(conf.tlsConfig)),
+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32),
grpc.MaxCallSendMsgSize(math.MaxInt32)))
} else {
- conn, err = grpc.Dial(conf.addr, grpc.WithInsecure())
+ conn, err = grpc.Dial(conf.addr, grpc.WithInsecure(),
+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32),
grpc.MaxCallSendMsgSize(math.MaxInt32)))
}
if err != nil {
diff --git a/syncer/server/handler.go b/syncer/server/handler.go
index 0716db4..fae5434 100644
--- a/syncer/server/handler.go
+++ b/syncer/server/handler.go
@@ -46,8 +46,6 @@ func (s *Server) tickHandler() {
return
}
log.Debugf("Handle Tick")
- // Flush data to the storage of servicecenter
- s.servicecenter.FlushData()
// sends a UserEvent on Serf, the event will be broadcast between
members
s.mux.Lock()
@@ -200,6 +198,9 @@ func (s *Server) incrementUserEvent(data ...[]byte)
(success bool) {
}
func (s *Server) notifyUserEvent(data ...[]byte) (success bool) {
+ // Flush data to the storage of servicecenter
+ s.servicecenter.FlushData()
+
err := s.serf.UserEvent(EventDiscovered,
util.StringToBytesWithNoCopy(s.conf.Cluster))
if err != nil {
log.Error("Syncer send discovered user event failed", err)
diff --git a/syncer/servicecenter/servicecenter.go
b/syncer/servicecenter/servicecenter.go
index 1c4bd66..25bf651 100644
--- a/syncer/servicecenter/servicecenter.go
+++ b/syncer/servicecenter/servicecenter.go
@@ -84,16 +84,19 @@ func (s *servicecenter) Registry(clusterName string, data
*pb.SyncData) {
svc := searchService(inst, data.Services)
if svc == nil {
err := errors.New("service does not exist")
- log.Errorf(err, "servicecenter.Registry, serviceID =
%s, instanceId = %s", inst.ServiceId, inst.InstanceId)
+ log.Error(fmt.Sprintf("servicecenter.Registry,
serviceID = %s, instanceId = %s", inst.ServiceId, inst.InstanceId), err)
continue
}
// If the svc is in the mapping, just do nothing, if not,
created it in servicecenter and get the new serviceID
- svcID := s.createService(svc)
- log.Debugf("create service success orgServiceID= %s,
curServiceID = %s", inst.ServiceId, svcID)
+ svcID, err := s.createService(svc)
+ if err != nil {
+ log.Error("create service failed", err)
+ continue
+ }
// If inst is in the mapping, just heart beat it in
servicecenter
- log.Debugf("trying to do registration of instance, instanceID =
%s", inst.InstanceId)
+ log.Debug(fmt.Sprintf("trying to do registration of instance,
instanceID = %s", inst.InstanceId))
if s.heartbeatInstances(mapping, inst) {
continue
}
@@ -136,10 +139,6 @@ func (s *servicecenter) IncrementRegistry(clusterName
string, data *pb.SyncData)
continue
}
- // If the svc is in the mapping, just do nothing, if not,
created it in servicecenter and get the new serviceID
- svcID := s.createService(svc)
- log.Debug(fmt.Sprintf("create service success orgServiceID= %s,
curServiceID = %s", inst.ServiceId, svcID))
-
matches := pb.Expansions(inst.Expansions).Find("action",
map[string]string{})
if len(matches) != 1 {
err := utils.ErrActionInvalid
@@ -149,6 +148,13 @@ func (s *servicecenter) IncrementRegistry(clusterName
string, data *pb.SyncData)
action := string(matches[0].Bytes[:])
if action == string(discovery.EVT_CREATE) {
+ // If the svc is in the mapping, just do nothing, if
not, created it in servicecenter and get the new serviceID
+ svcID, err := s.createService(svc)
+ if err != nil {
+ log.Error("create service failed", err)
+ continue
+ }
+
log.Debug(fmt.Sprintf("trying to do registration of
instance, instanceID = %s", inst.InstanceId))
// If inst is in the mapping, just heart beat it in
servicecenter
diff --git a/syncer/servicecenter/sync.go b/syncer/servicecenter/sync.go
index f7dcb0e..9eed8a9 100644
--- a/syncer/servicecenter/sync.go
+++ b/syncer/servicecenter/sync.go
@@ -19,6 +19,7 @@ package servicecenter
import (
"context"
+ "fmt"
"github.com/apache/servicecomb-service-center/pkg/log"
pb "github.com/apache/servicecomb-service-center/syncer/proto"
@@ -40,19 +41,25 @@ func (s *servicecenter) heartbeatInstances(mapping
pb.SyncMapping, instance *pb.
return true
}
-func (s *servicecenter) createService(service *pb.SyncService) string {
+func (s *servicecenter) createService(service *pb.SyncService) (string, error)
{
ctx := context.Background()
- serviceID, _ := s.servicecenter.ServiceExistence(ctx,
service.DomainProject, service)
+ serviceID, err := s.servicecenter.ServiceExistence(ctx,
service.DomainProject, service)
+ if err != nil {
+ log.Error("get service existence failed", err)
+ return "", err
+ }
+
if serviceID == "" {
- var err error
serviceID, err = s.servicecenter.CreateService(ctx,
service.DomainProject, service)
if err != nil {
- log.Errorf(err, "Servicecenter create service failed")
- return ""
+ log.Error("create service failed", err)
+ return "", err
}
- log.Debugf("Create service successful, serviceID = %s",
serviceID)
+ log.Debug(fmt.Sprintf("create service successful, serviceID =
%s", serviceID))
+ } else {
+ log.Debug(fmt.Sprintf("service already exists, serviceID = %s",
serviceID))
}
- return serviceID
+ return serviceID, nil
}
func (s *servicecenter) registryInstances(domainProject, serviceID string,
instance *pb.SyncInstance) string {