This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git

commit 1a4c2df7282f26084ed0982a5b1746370fce02d0
Author: little-cui <[email protected]>
AuthorDate: Tue Nov 13 15:41:22 2018 +0800

    SCB-993 Fixed the error of microservice registration in multi-dc
---
 server/admin/admin_suite_test.go                 |  3 ++-
 server/core/microservice.go                      | 12 +++++----
 server/govern/govern_suite_test.go               |  3 ++-
 server/handler/cache/cache.go                    |  4 +++
 server/plugin/pkg/discovery/aggregate/adaptor.go | 10 +++-----
 server/plugin/pkg/discovery/aggregate/common.go  | 17 +++++++++++--
 server/plugin/pkg/discovery/aggregate/indexer.go | 31 ++++++++++++++++++++++++
 server/plugin/pkg/registry/common.go             |  2 ++
 server/plugin/pkg/registry/config.go             |  2 +-
 server/plugin/pkg/registry/option.go             |  5 ++++
 server/service/util/common.go                    |  9 ++++---
 server/service/util/util.go                      |  3 +++
 12 files changed, 80 insertions(+), 21 deletions(-)

diff --git a/server/admin/admin_suite_test.go b/server/admin/admin_suite_test.go
index 8053859..9c11b87 100644
--- a/server/admin/admin_suite_test.go
+++ b/server/admin/admin_suite_test.go
@@ -20,6 +20,7 @@ import _ 
"github.com/apache/servicecomb-service-center/server/init"
 import _ "github.com/apache/servicecomb-service-center/server/bootstrap"
 import (
        "github.com/apache/servicecomb-service-center/pkg/util"
+       serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
        "github.com/astaxie/beego"
        . "github.com/onsi/ginkgo"
        "github.com/onsi/ginkgo/reporters"
@@ -45,5 +46,5 @@ var _ = BeforeSuite(func() {
 func getContext() context.Context {
        return util.SetContext(
                util.SetDomainProject(context.Background(), "default", 
"default"),
-               "noCache", "1")
+               serviceUtil.CTX_NOCACHE, "1")
 }
diff --git a/server/core/microservice.go b/server/core/microservice.go
index 25765a1..9331e2c 100644
--- a/server/core/microservice.go
+++ b/server/core/microservice.go
@@ -45,7 +45,8 @@ const (
        REGISTRY_DEFAULT_LEASE_RENEWALINTERVAL int32 = 30
        REGISTRY_DEFAULT_LEASE_RETRYTIMES      int32 = 3
 
-       IS_SC_SELF = "sc_self"
+       CTX_SC_SELF     = "_sc_self"
+       CTX_SC_REGISTRY = "_registryOnly"
 )
 
 func init() {
@@ -85,9 +86,10 @@ func prepareSelfRegistration() {
 }
 
 func AddDefaultContextValue(ctx context.Context) context.Context {
-       return util.SetContext(
-               util.SetDomainProject(ctx, REGISTRY_DOMAIN, REGISTRY_PROJECT),
-               IS_SC_SELF, true)
+       return util.SetContext(util.SetContext(util.SetDomainProject(ctx,
+               REGISTRY_DOMAIN, REGISTRY_PROJECT),
+               CTX_SC_SELF, true),
+               CTX_SC_REGISTRY, "1")
 }
 
 func IsDefaultDomainProject(domainProject string) bool {
@@ -116,7 +118,7 @@ func IsShared(key *pb.MicroServiceKey) bool {
 }
 
 func IsSCInstance(ctx context.Context) bool {
-       b, _ := ctx.Value(IS_SC_SELF).(bool)
+       b, _ := ctx.Value(CTX_SC_SELF).(bool)
        return b
 }
 
diff --git a/server/govern/govern_suite_test.go 
b/server/govern/govern_suite_test.go
index 2b20ef0..0954f22 100644
--- a/server/govern/govern_suite_test.go
+++ b/server/govern/govern_suite_test.go
@@ -25,6 +25,7 @@ import (
        pb "github.com/apache/servicecomb-service-center/server/core/proto"
        "github.com/apache/servicecomb-service-center/server/govern"
        "github.com/apache/servicecomb-service-center/server/service"
+       serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
        "github.com/astaxie/beego"
        . "github.com/onsi/ginkgo"
        "github.com/onsi/ginkgo/reporters"
@@ -54,5 +55,5 @@ var _ = BeforeSuite(func() {
 func getContext() context.Context {
        return util.SetContext(
                util.SetDomainProject(context.Background(), "default", 
"default"),
-               "noCache", "1")
+               serviceUtil.CTX_NOCACHE, "1")
 }
diff --git a/server/handler/cache/cache.go b/server/handler/cache/cache.go
index 7bda9d4..4bb6fd5 100644
--- a/server/handler/cache/cache.go
+++ b/server/handler/cache/cache.go
@@ -32,6 +32,10 @@ func (l *CacheResponse) Handle(i *chain.Invocation) {
        r := i.Context().Value(rest.CTX_REQUEST).(*http.Request)
        query := r.URL.Query()
 
+       if r.Method != http.MethodGet {
+               i.WithContext(serviceUtil.CTX_REGISTRYONLY, "1")
+       }
+
        noCache := query.Get(serviceUtil.CTX_NOCACHE) == "1"
        if noCache {
                i.WithContext(serviceUtil.CTX_NOCACHE, "1")
diff --git a/server/plugin/pkg/discovery/aggregate/adaptor.go 
b/server/plugin/pkg/discovery/aggregate/adaptor.go
index 6efb2c5..e56e7c3 100644
--- a/server/plugin/pkg/discovery/aggregate/adaptor.go
+++ b/server/plugin/pkg/discovery/aggregate/adaptor.go
@@ -26,6 +26,7 @@ import (
 // Aggregator is a discovery service adaptor implement of one registry cluster
 type Aggregator struct {
        discovery.Indexer
+       Type     discovery.Type
        Adaptors []discovery.Adaptor
 }
 
@@ -81,21 +82,16 @@ func getLogConflictFunc(t discovery.Type) func(origin, 
conflict *discovery.KeyVa
 }
 
 func NewAggregator(t discovery.Type, cfg *discovery.Config) *Aggregator {
-       as := &Aggregator{}
+       as := &Aggregator{Type: t}
        for _, name := range repos {
                repo := mgr.Plugins().Get(mgr.DISCOVERY, 
name).New().(discovery.AdaptorRepository)
                as.Adaptors = append(as.Adaptors, repo.New(t, cfg))
        }
+       as.Indexer = NewAggregatorIndexer(as)
 
        switch t {
-       case backend.SCHEMA:
-               // schema does not been cached, so new the adaptor indexer
-               as.Indexer = NewAdaptorsIndexer(as.Adaptors)
        case backend.SERVICE_INDEX, backend.SERVICE_ALIAS:
                NewConflictChecker(as.Cache(), getLogConflictFunc(t))
-               fallthrough
-       default:
-               as.Indexer = discovery.NewCacheIndexer(as.Cache())
        }
        return as
 }
diff --git a/server/plugin/pkg/discovery/aggregate/common.go 
b/server/plugin/pkg/discovery/aggregate/common.go
index 28c383f..20f43d9 100644
--- a/server/plugin/pkg/discovery/aggregate/common.go
+++ b/server/plugin/pkg/discovery/aggregate/common.go
@@ -27,8 +27,9 @@ const (
 )
 
 var (
-       closedCh = make(chan struct{})
-       repos    []string
+       closedCh      = make(chan struct{})
+       repos         []string
+       registryIndex = 0
 )
 
 func init() {
@@ -41,4 +42,16 @@ func init() {
        modes := beego.AppConfig.DefaultString("aggregate_mode", AggregateModes)
        repos = strings.Split(modes, ",")
        log.Infof("aggregate_mode is %s", repos)
+
+       // here save the index if found the registry plugin in modes list,
+       // it is used for getting the one writable registry to handle requests
+       // from API layer.
+       registry := beego.AppConfig.String("registry_plugin")
+       for i, repo := range repos {
+               if repo == registry {
+                       registryIndex = i
+                       log.Infof("found the registry index is %d", 
registryIndex)
+                       break
+               }
+       }
 }
diff --git a/server/plugin/pkg/discovery/aggregate/indexer.go 
b/server/plugin/pkg/discovery/aggregate/indexer.go
index 98c692f..912c997 100644
--- a/server/plugin/pkg/discovery/aggregate/indexer.go
+++ b/server/plugin/pkg/discovery/aggregate/indexer.go
@@ -17,6 +17,7 @@ package aggregate
 
 import (
        "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/apache/servicecomb-service-center/server/core/backend"
        
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
        
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
        "golang.org/x/net/context"
@@ -51,3 +52,33 @@ func (i *AdaptorsIndexer) Search(ctx context.Context, opts 
...registry.PluginOpO
 func NewAdaptorsIndexer(as []discovery.Adaptor) *AdaptorsIndexer {
        return &AdaptorsIndexer{Adaptors: as}
 }
+
+type AggregatorIndexer struct {
+       Indexer  discovery.Indexer
+       Registry discovery.Indexer
+}
+
+func (i *AggregatorIndexer) Search(ctx context.Context, opts 
...registry.PluginOpOption) (*discovery.Response, error) {
+       op := registry.OptionsToOp(opts...)
+       if op.RegistryOnly {
+               return i.Registry.Search(ctx, opts...)
+       }
+
+       return i.Indexer.Search(ctx, opts...)
+}
+
+func NewAggregatorIndexer(as *Aggregator) *AggregatorIndexer {
+       ai := &AggregatorIndexer{}
+       switch as.Type {
+       case backend.SCHEMA:
+               // schema does not been cached
+               ai.Indexer = NewAdaptorsIndexer(as.Adaptors)
+       default:
+               ai.Indexer = discovery.NewCacheIndexer(as.Cache())
+       }
+       ai.Registry = ai.Indexer
+       if registryIndex >= 0 {
+               ai.Registry = as.Adaptors[registryIndex]
+       }
+       return ai
+}
diff --git a/server/plugin/pkg/registry/common.go 
b/server/plugin/pkg/registry/common.go
index fd7efb9..6509c71 100644
--- a/server/plugin/pkg/registry/common.go
+++ b/server/plugin/pkg/registry/common.go
@@ -58,6 +58,8 @@ const (
        // the timeout dial to etcd
        defaultDialTimeout    = 10 * time.Second
        defaultRequestTimeout = 30 * time.Second
+
+       defaultClusterName = "default"
 )
 
 func WithTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
diff --git a/server/plugin/pkg/registry/config.go 
b/server/plugin/pkg/registry/config.go
index 34ab1ac..60f1701 100644
--- a/server/plugin/pkg/registry/config.go
+++ b/server/plugin/pkg/registry/config.go
@@ -84,7 +84,7 @@ func (c *Config) RegistryAddresses() []string {
 func Configuration() *Config {
        configOnce.Do(func() {
                var err error
-               defaultRegistryConfig.ClusterName = 
beego.AppConfig.DefaultString("manager_name", "default")
+               defaultRegistryConfig.ClusterName = 
beego.AppConfig.DefaultString("manager_name", defaultClusterName)
                defaultRegistryConfig.ManagerAddress = 
beego.AppConfig.String("manager_addr")
                defaultRegistryConfig.ClusterAddresses = 
beego.AppConfig.DefaultString("manager_cluster", "http://127.0.0.1:2379";)
                defaultRegistryConfig.InitClusters()
diff --git a/server/plugin/pkg/registry/option.go 
b/server/plugin/pkg/registry/option.go
index 2f33987..f81a22c 100644
--- a/server/plugin/pkg/registry/option.go
+++ b/server/plugin/pkg/registry/option.go
@@ -37,6 +37,7 @@ type PluginOp struct {
        WatchCallback WatchCallback
        Offset        int64
        Limit         int64
+       RegistryOnly  bool
 }
 
 func (op PluginOp) String() string {
@@ -87,6 +88,9 @@ func (op PluginOp) FormatUrlParams() string {
        if op.Limit > 0 {
                buf.WriteString(fmt.Sprintf("&limit=%d", op.Limit))
        }
+       if op.RegistryOnly {
+               buf.WriteString("&registryOnly=true")
+       }
        return buf.String()
 }
 
@@ -107,6 +111,7 @@ func WithPrevKv() PluginOpOption             { return 
func(op *PluginOp) { op.Pr
 func WithLease(leaseID int64) PluginOpOption { return func(op *PluginOp) { 
op.Lease = leaseID } }
 func WithKeyOnly() PluginOpOption            { return func(op *PluginOp) { 
op.KeyOnly = true } }
 func WithCountOnly() PluginOpOption          { return func(op *PluginOp) { 
op.CountOnly = true } }
+func WithRegistryOnly() PluginOpOption       { return func(op *PluginOp) { 
op.RegistryOnly = true } }
 func WithNoneOrder() PluginOpOption          { return func(op *PluginOp) { 
op.SortOrder = SORT_NONE } }
 func WithAscendOrder() PluginOpOption        { return func(op *PluginOp) { 
op.SortOrder = SORT_ASCEND } }
 func WithDescendOrder() PluginOpOption       { return func(op *PluginOp) { 
op.SortOrder = SORT_DESCEND } }
diff --git a/server/service/util/common.go b/server/service/util/common.go
index 6c25e51..0b182aa 100644
--- a/server/service/util/common.go
+++ b/server/service/util/common.go
@@ -18,8 +18,9 @@ package util
 
 const (
        HEADER_REV            = "X-Resource-Revision"
-       CTX_NOCACHE           = "noCache"
-       CTX_CACHEONLY         = "cacheOnly"
-       CTX_REQUEST_REVISION  = "requestRev"
-       CTX_RESPONSE_REVISION = "responseRev"
+       CTX_REGISTRYONLY      = "_registryOnly"
+       CTX_NOCACHE           = "_noCache"
+       CTX_CACHEONLY         = "_cacheOnly"
+       CTX_REQUEST_REVISION  = "_requestRev"
+       CTX_RESPONSE_REVISION = "_responseRev"
 )
diff --git a/server/service/util/util.go b/server/service/util/util.go
index 9a01d0b..2184b29 100644
--- a/server/service/util/util.go
+++ b/server/service/util/util.go
@@ -29,5 +29,8 @@ func FromContext(ctx context.Context) 
[]registry.PluginOpOption {
        case ctx.Value(CTX_CACHEONLY) == "1":
                opts = append(opts, registry.WithCacheOnly())
        }
+       if ctx.Value(CTX_REGISTRYONLY) == "1" {
+               opts = append(opts, registry.WithRegistryOnly())
+       }
        return opts
 }

Reply via email to