This is an automated email from the ASF dual-hosted git repository. littlecui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
commit 1a4c2df7282f26084ed0982a5b1746370fce02d0 Author: little-cui <[email protected]> AuthorDate: Tue Nov 13 15:41:22 2018 +0800 SCB-993 Fixed the error of microservice registration in multi-dc --- server/admin/admin_suite_test.go | 3 ++- server/core/microservice.go | 12 +++++---- server/govern/govern_suite_test.go | 3 ++- server/handler/cache/cache.go | 4 +++ server/plugin/pkg/discovery/aggregate/adaptor.go | 10 +++----- server/plugin/pkg/discovery/aggregate/common.go | 17 +++++++++++-- server/plugin/pkg/discovery/aggregate/indexer.go | 31 ++++++++++++++++++++++++ server/plugin/pkg/registry/common.go | 2 ++ server/plugin/pkg/registry/config.go | 2 +- server/plugin/pkg/registry/option.go | 5 ++++ server/service/util/common.go | 9 ++++--- server/service/util/util.go | 3 +++ 12 files changed, 80 insertions(+), 21 deletions(-) diff --git a/server/admin/admin_suite_test.go b/server/admin/admin_suite_test.go index 8053859..9c11b87 100644 --- a/server/admin/admin_suite_test.go +++ b/server/admin/admin_suite_test.go @@ -20,6 +20,7 @@ import _ "github.com/apache/servicecomb-service-center/server/init" import _ "github.com/apache/servicecomb-service-center/server/bootstrap" import ( "github.com/apache/servicecomb-service-center/pkg/util" + serviceUtil "github.com/apache/servicecomb-service-center/server/service/util" "github.com/astaxie/beego" . "github.com/onsi/ginkgo" "github.com/onsi/ginkgo/reporters" @@ -45,5 +46,5 @@ var _ = BeforeSuite(func() { func getContext() context.Context { return util.SetContext( util.SetDomainProject(context.Background(), "default", "default"), - "noCache", "1") + serviceUtil.CTX_NOCACHE, "1") } diff --git a/server/core/microservice.go b/server/core/microservice.go index 25765a1..9331e2c 100644 --- a/server/core/microservice.go +++ b/server/core/microservice.go @@ -45,7 +45,8 @@ const ( REGISTRY_DEFAULT_LEASE_RENEWALINTERVAL int32 = 30 REGISTRY_DEFAULT_LEASE_RETRYTIMES int32 = 3 - IS_SC_SELF = "sc_self" + CTX_SC_SELF = "_sc_self" + CTX_SC_REGISTRY = "_registryOnly" ) func init() { @@ -85,9 +86,10 @@ func prepareSelfRegistration() { } func AddDefaultContextValue(ctx context.Context) context.Context { - return util.SetContext( - util.SetDomainProject(ctx, REGISTRY_DOMAIN, REGISTRY_PROJECT), - IS_SC_SELF, true) + return util.SetContext(util.SetContext(util.SetDomainProject(ctx, + REGISTRY_DOMAIN, REGISTRY_PROJECT), + CTX_SC_SELF, true), + CTX_SC_REGISTRY, "1") } func IsDefaultDomainProject(domainProject string) bool { @@ -116,7 +118,7 @@ func IsShared(key *pb.MicroServiceKey) bool { } func IsSCInstance(ctx context.Context) bool { - b, _ := ctx.Value(IS_SC_SELF).(bool) + b, _ := ctx.Value(CTX_SC_SELF).(bool) return b } diff --git a/server/govern/govern_suite_test.go b/server/govern/govern_suite_test.go index 2b20ef0..0954f22 100644 --- a/server/govern/govern_suite_test.go +++ b/server/govern/govern_suite_test.go @@ -25,6 +25,7 @@ import ( pb "github.com/apache/servicecomb-service-center/server/core/proto" "github.com/apache/servicecomb-service-center/server/govern" "github.com/apache/servicecomb-service-center/server/service" + serviceUtil "github.com/apache/servicecomb-service-center/server/service/util" "github.com/astaxie/beego" . "github.com/onsi/ginkgo" "github.com/onsi/ginkgo/reporters" @@ -54,5 +55,5 @@ var _ = BeforeSuite(func() { func getContext() context.Context { return util.SetContext( util.SetDomainProject(context.Background(), "default", "default"), - "noCache", "1") + serviceUtil.CTX_NOCACHE, "1") } diff --git a/server/handler/cache/cache.go b/server/handler/cache/cache.go index 7bda9d4..4bb6fd5 100644 --- a/server/handler/cache/cache.go +++ b/server/handler/cache/cache.go @@ -32,6 +32,10 @@ func (l *CacheResponse) Handle(i *chain.Invocation) { r := i.Context().Value(rest.CTX_REQUEST).(*http.Request) query := r.URL.Query() + if r.Method != http.MethodGet { + i.WithContext(serviceUtil.CTX_REGISTRYONLY, "1") + } + noCache := query.Get(serviceUtil.CTX_NOCACHE) == "1" if noCache { i.WithContext(serviceUtil.CTX_NOCACHE, "1") diff --git a/server/plugin/pkg/discovery/aggregate/adaptor.go b/server/plugin/pkg/discovery/aggregate/adaptor.go index 6efb2c5..e56e7c3 100644 --- a/server/plugin/pkg/discovery/aggregate/adaptor.go +++ b/server/plugin/pkg/discovery/aggregate/adaptor.go @@ -26,6 +26,7 @@ import ( // Aggregator is a discovery service adaptor implement of one registry cluster type Aggregator struct { discovery.Indexer + Type discovery.Type Adaptors []discovery.Adaptor } @@ -81,21 +82,16 @@ func getLogConflictFunc(t discovery.Type) func(origin, conflict *discovery.KeyVa } func NewAggregator(t discovery.Type, cfg *discovery.Config) *Aggregator { - as := &Aggregator{} + as := &Aggregator{Type: t} for _, name := range repos { repo := mgr.Plugins().Get(mgr.DISCOVERY, name).New().(discovery.AdaptorRepository) as.Adaptors = append(as.Adaptors, repo.New(t, cfg)) } + as.Indexer = NewAggregatorIndexer(as) switch t { - case backend.SCHEMA: - // schema does not been cached, so new the adaptor indexer - as.Indexer = NewAdaptorsIndexer(as.Adaptors) case backend.SERVICE_INDEX, backend.SERVICE_ALIAS: NewConflictChecker(as.Cache(), getLogConflictFunc(t)) - fallthrough - default: - as.Indexer = discovery.NewCacheIndexer(as.Cache()) } return as } diff --git a/server/plugin/pkg/discovery/aggregate/common.go b/server/plugin/pkg/discovery/aggregate/common.go index 28c383f..20f43d9 100644 --- a/server/plugin/pkg/discovery/aggregate/common.go +++ b/server/plugin/pkg/discovery/aggregate/common.go @@ -27,8 +27,9 @@ const ( ) var ( - closedCh = make(chan struct{}) - repos []string + closedCh = make(chan struct{}) + repos []string + registryIndex = 0 ) func init() { @@ -41,4 +42,16 @@ func init() { modes := beego.AppConfig.DefaultString("aggregate_mode", AggregateModes) repos = strings.Split(modes, ",") log.Infof("aggregate_mode is %s", repos) + + // here save the index if found the registry plugin in modes list, + // it is used for getting the one writable registry to handle requests + // from API layer. + registry := beego.AppConfig.String("registry_plugin") + for i, repo := range repos { + if repo == registry { + registryIndex = i + log.Infof("found the registry index is %d", registryIndex) + break + } + } } diff --git a/server/plugin/pkg/discovery/aggregate/indexer.go b/server/plugin/pkg/discovery/aggregate/indexer.go index 98c692f..912c997 100644 --- a/server/plugin/pkg/discovery/aggregate/indexer.go +++ b/server/plugin/pkg/discovery/aggregate/indexer.go @@ -17,6 +17,7 @@ package aggregate import ( "github.com/apache/servicecomb-service-center/pkg/util" + "github.com/apache/servicecomb-service-center/server/core/backend" "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery" "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry" "golang.org/x/net/context" @@ -51,3 +52,33 @@ func (i *AdaptorsIndexer) Search(ctx context.Context, opts ...registry.PluginOpO func NewAdaptorsIndexer(as []discovery.Adaptor) *AdaptorsIndexer { return &AdaptorsIndexer{Adaptors: as} } + +type AggregatorIndexer struct { + Indexer discovery.Indexer + Registry discovery.Indexer +} + +func (i *AggregatorIndexer) Search(ctx context.Context, opts ...registry.PluginOpOption) (*discovery.Response, error) { + op := registry.OptionsToOp(opts...) + if op.RegistryOnly { + return i.Registry.Search(ctx, opts...) + } + + return i.Indexer.Search(ctx, opts...) +} + +func NewAggregatorIndexer(as *Aggregator) *AggregatorIndexer { + ai := &AggregatorIndexer{} + switch as.Type { + case backend.SCHEMA: + // schema does not been cached + ai.Indexer = NewAdaptorsIndexer(as.Adaptors) + default: + ai.Indexer = discovery.NewCacheIndexer(as.Cache()) + } + ai.Registry = ai.Indexer + if registryIndex >= 0 { + ai.Registry = as.Adaptors[registryIndex] + } + return ai +} diff --git a/server/plugin/pkg/registry/common.go b/server/plugin/pkg/registry/common.go index fd7efb9..6509c71 100644 --- a/server/plugin/pkg/registry/common.go +++ b/server/plugin/pkg/registry/common.go @@ -58,6 +58,8 @@ const ( // the timeout dial to etcd defaultDialTimeout = 10 * time.Second defaultRequestTimeout = 30 * time.Second + + defaultClusterName = "default" ) func WithTimeout(ctx context.Context) (context.Context, context.CancelFunc) { diff --git a/server/plugin/pkg/registry/config.go b/server/plugin/pkg/registry/config.go index 34ab1ac..60f1701 100644 --- a/server/plugin/pkg/registry/config.go +++ b/server/plugin/pkg/registry/config.go @@ -84,7 +84,7 @@ func (c *Config) RegistryAddresses() []string { func Configuration() *Config { configOnce.Do(func() { var err error - defaultRegistryConfig.ClusterName = beego.AppConfig.DefaultString("manager_name", "default") + defaultRegistryConfig.ClusterName = beego.AppConfig.DefaultString("manager_name", defaultClusterName) defaultRegistryConfig.ManagerAddress = beego.AppConfig.String("manager_addr") defaultRegistryConfig.ClusterAddresses = beego.AppConfig.DefaultString("manager_cluster", "http://127.0.0.1:2379") defaultRegistryConfig.InitClusters() diff --git a/server/plugin/pkg/registry/option.go b/server/plugin/pkg/registry/option.go index 2f33987..f81a22c 100644 --- a/server/plugin/pkg/registry/option.go +++ b/server/plugin/pkg/registry/option.go @@ -37,6 +37,7 @@ type PluginOp struct { WatchCallback WatchCallback Offset int64 Limit int64 + RegistryOnly bool } func (op PluginOp) String() string { @@ -87,6 +88,9 @@ func (op PluginOp) FormatUrlParams() string { if op.Limit > 0 { buf.WriteString(fmt.Sprintf("&limit=%d", op.Limit)) } + if op.RegistryOnly { + buf.WriteString("®istryOnly=true") + } return buf.String() } @@ -107,6 +111,7 @@ func WithPrevKv() PluginOpOption { return func(op *PluginOp) { op.Pr func WithLease(leaseID int64) PluginOpOption { return func(op *PluginOp) { op.Lease = leaseID } } func WithKeyOnly() PluginOpOption { return func(op *PluginOp) { op.KeyOnly = true } } func WithCountOnly() PluginOpOption { return func(op *PluginOp) { op.CountOnly = true } } +func WithRegistryOnly() PluginOpOption { return func(op *PluginOp) { op.RegistryOnly = true } } func WithNoneOrder() PluginOpOption { return func(op *PluginOp) { op.SortOrder = SORT_NONE } } func WithAscendOrder() PluginOpOption { return func(op *PluginOp) { op.SortOrder = SORT_ASCEND } } func WithDescendOrder() PluginOpOption { return func(op *PluginOp) { op.SortOrder = SORT_DESCEND } } diff --git a/server/service/util/common.go b/server/service/util/common.go index 6c25e51..0b182aa 100644 --- a/server/service/util/common.go +++ b/server/service/util/common.go @@ -18,8 +18,9 @@ package util const ( HEADER_REV = "X-Resource-Revision" - CTX_NOCACHE = "noCache" - CTX_CACHEONLY = "cacheOnly" - CTX_REQUEST_REVISION = "requestRev" - CTX_RESPONSE_REVISION = "responseRev" + CTX_REGISTRYONLY = "_registryOnly" + CTX_NOCACHE = "_noCache" + CTX_CACHEONLY = "_cacheOnly" + CTX_REQUEST_REVISION = "_requestRev" + CTX_RESPONSE_REVISION = "_responseRev" ) diff --git a/server/service/util/util.go b/server/service/util/util.go index 9a01d0b..2184b29 100644 --- a/server/service/util/util.go +++ b/server/service/util/util.go @@ -29,5 +29,8 @@ func FromContext(ctx context.Context) []registry.PluginOpOption { case ctx.Value(CTX_CACHEONLY) == "1": opts = append(opts, registry.WithCacheOnly()) } + if ctx.Value(CTX_REGISTRYONLY) == "1" { + opts = append(opts, registry.WithRegistryOnly()) + } return opts }
