[ 
https://issues.apache.org/jira/browse/SCB-993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16686646#comment-16686646
 ] 

ASF GitHub Bot commented on SCB-993:
------------------------------------

little-cui closed pull request #487: SCB-993 Fixed the error of microservice 
registration in multi-dc
URL: https://github.com/apache/servicecomb-service-center/pull/487
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/server/admin/admin_suite_test.go b/server/admin/admin_suite_test.go
index 8053859a..9c11b874 100644
--- a/server/admin/admin_suite_test.go
+++ b/server/admin/admin_suite_test.go
@@ -20,6 +20,7 @@ import _ 
"github.com/apache/servicecomb-service-center/server/init"
 import _ "github.com/apache/servicecomb-service-center/server/bootstrap"
 import (
        "github.com/apache/servicecomb-service-center/pkg/util"
+       serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
        "github.com/astaxie/beego"
        . "github.com/onsi/ginkgo"
        "github.com/onsi/ginkgo/reporters"
@@ -45,5 +46,5 @@ var _ = BeforeSuite(func() {
 func getContext() context.Context {
        return util.SetContext(
                util.SetDomainProject(context.Background(), "default", 
"default"),
-               "noCache", "1")
+               serviceUtil.CTX_NOCACHE, "1")
 }
diff --git a/server/core/microservice.go b/server/core/microservice.go
index 25765a1a..9331e2c1 100644
--- a/server/core/microservice.go
+++ b/server/core/microservice.go
@@ -45,7 +45,8 @@ const (
        REGISTRY_DEFAULT_LEASE_RENEWALINTERVAL int32 = 30
        REGISTRY_DEFAULT_LEASE_RETRYTIMES      int32 = 3
 
-       IS_SC_SELF = "sc_self"
+       CTX_SC_SELF     = "_sc_self"
+       CTX_SC_REGISTRY = "_registryOnly"
 )
 
 func init() {
@@ -85,9 +86,10 @@ func prepareSelfRegistration() {
 }
 
 func AddDefaultContextValue(ctx context.Context) context.Context {
-       return util.SetContext(
-               util.SetDomainProject(ctx, REGISTRY_DOMAIN, REGISTRY_PROJECT),
-               IS_SC_SELF, true)
+       return util.SetContext(util.SetContext(util.SetDomainProject(ctx,
+               REGISTRY_DOMAIN, REGISTRY_PROJECT),
+               CTX_SC_SELF, true),
+               CTX_SC_REGISTRY, "1")
 }
 
 func IsDefaultDomainProject(domainProject string) bool {
@@ -116,7 +118,7 @@ func IsShared(key *pb.MicroServiceKey) bool {
 }
 
 func IsSCInstance(ctx context.Context) bool {
-       b, _ := ctx.Value(IS_SC_SELF).(bool)
+       b, _ := ctx.Value(CTX_SC_SELF).(bool)
        return b
 }
 
diff --git a/server/govern/govern_suite_test.go 
b/server/govern/govern_suite_test.go
index 2b20ef0d..0954f22f 100644
--- a/server/govern/govern_suite_test.go
+++ b/server/govern/govern_suite_test.go
@@ -25,6 +25,7 @@ import (
        pb "github.com/apache/servicecomb-service-center/server/core/proto"
        "github.com/apache/servicecomb-service-center/server/govern"
        "github.com/apache/servicecomb-service-center/server/service"
+       serviceUtil 
"github.com/apache/servicecomb-service-center/server/service/util"
        "github.com/astaxie/beego"
        . "github.com/onsi/ginkgo"
        "github.com/onsi/ginkgo/reporters"
@@ -54,5 +55,5 @@ var _ = BeforeSuite(func() {
 func getContext() context.Context {
        return util.SetContext(
                util.SetDomainProject(context.Background(), "default", 
"default"),
-               "noCache", "1")
+               serviceUtil.CTX_NOCACHE, "1")
 }
diff --git a/server/handler/cache/cache.go b/server/handler/cache/cache.go
index 7bda9d4f..4bb6fd5d 100644
--- a/server/handler/cache/cache.go
+++ b/server/handler/cache/cache.go
@@ -32,6 +32,10 @@ func (l *CacheResponse) Handle(i *chain.Invocation) {
        r := i.Context().Value(rest.CTX_REQUEST).(*http.Request)
        query := r.URL.Query()
 
+       if r.Method != http.MethodGet {
+               i.WithContext(serviceUtil.CTX_REGISTRYONLY, "1")
+       }
+
        noCache := query.Get(serviceUtil.CTX_NOCACHE) == "1"
        if noCache {
                i.WithContext(serviceUtil.CTX_NOCACHE, "1")
diff --git a/server/plugin/pkg/discovery/aggregate/adaptor.go 
b/server/plugin/pkg/discovery/aggregate/adaptor.go
index 6efb2c5c..e56e7c38 100644
--- a/server/plugin/pkg/discovery/aggregate/adaptor.go
+++ b/server/plugin/pkg/discovery/aggregate/adaptor.go
@@ -26,6 +26,7 @@ import (
 // Aggregator is a discovery service adaptor implement of one registry cluster
 type Aggregator struct {
        discovery.Indexer
+       Type     discovery.Type
        Adaptors []discovery.Adaptor
 }
 
@@ -81,21 +82,16 @@ func getLogConflictFunc(t discovery.Type) func(origin, 
conflict *discovery.KeyVa
 }
 
 func NewAggregator(t discovery.Type, cfg *discovery.Config) *Aggregator {
-       as := &Aggregator{}
+       as := &Aggregator{Type: t}
        for _, name := range repos {
                repo := mgr.Plugins().Get(mgr.DISCOVERY, 
name).New().(discovery.AdaptorRepository)
                as.Adaptors = append(as.Adaptors, repo.New(t, cfg))
        }
+       as.Indexer = NewAggregatorIndexer(as)
 
        switch t {
-       case backend.SCHEMA:
-               // schema does not been cached, so new the adaptor indexer
-               as.Indexer = NewAdaptorsIndexer(as.Adaptors)
        case backend.SERVICE_INDEX, backend.SERVICE_ALIAS:
                NewConflictChecker(as.Cache(), getLogConflictFunc(t))
-               fallthrough
-       default:
-               as.Indexer = discovery.NewCacheIndexer(as.Cache())
        }
        return as
 }
diff --git a/server/plugin/pkg/discovery/aggregate/common.go 
b/server/plugin/pkg/discovery/aggregate/common.go
index 28c383f7..20f43d93 100644
--- a/server/plugin/pkg/discovery/aggregate/common.go
+++ b/server/plugin/pkg/discovery/aggregate/common.go
@@ -27,8 +27,9 @@ const (
 )
 
 var (
-       closedCh = make(chan struct{})
-       repos    []string
+       closedCh      = make(chan struct{})
+       repos         []string
+       registryIndex = 0
 )
 
 func init() {
@@ -41,4 +42,16 @@ func init() {
        modes := beego.AppConfig.DefaultString("aggregate_mode", AggregateModes)
        repos = strings.Split(modes, ",")
        log.Infof("aggregate_mode is %s", repos)
+
+       // here save the index if found the registry plugin in modes list,
+       // it is used for getting the one writable registry to handle requests
+       // from API layer.
+       registry := beego.AppConfig.String("registry_plugin")
+       for i, repo := range repos {
+               if repo == registry {
+                       registryIndex = i
+                       log.Infof("found the registry index is %d", 
registryIndex)
+                       break
+               }
+       }
 }
diff --git a/server/plugin/pkg/discovery/aggregate/indexer.go 
b/server/plugin/pkg/discovery/aggregate/indexer.go
index 98c692f6..912c9977 100644
--- a/server/plugin/pkg/discovery/aggregate/indexer.go
+++ b/server/plugin/pkg/discovery/aggregate/indexer.go
@@ -17,6 +17,7 @@ package aggregate
 
 import (
        "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/apache/servicecomb-service-center/server/core/backend"
        
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
        
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
        "golang.org/x/net/context"
@@ -51,3 +52,33 @@ func (i *AdaptorsIndexer) Search(ctx context.Context, opts 
...registry.PluginOpO
 func NewAdaptorsIndexer(as []discovery.Adaptor) *AdaptorsIndexer {
        return &AdaptorsIndexer{Adaptors: as}
 }
+
+type AggregatorIndexer struct {
+       Indexer  discovery.Indexer
+       Registry discovery.Indexer
+}
+
+func (i *AggregatorIndexer) Search(ctx context.Context, opts 
...registry.PluginOpOption) (*discovery.Response, error) {
+       op := registry.OptionsToOp(opts...)
+       if op.RegistryOnly {
+               return i.Registry.Search(ctx, opts...)
+       }
+
+       return i.Indexer.Search(ctx, opts...)
+}
+
+func NewAggregatorIndexer(as *Aggregator) *AggregatorIndexer {
+       ai := &AggregatorIndexer{}
+       switch as.Type {
+       case backend.SCHEMA:
+               // schema does not been cached
+               ai.Indexer = NewAdaptorsIndexer(as.Adaptors)
+       default:
+               ai.Indexer = discovery.NewCacheIndexer(as.Cache())
+       }
+       ai.Registry = ai.Indexer
+       if registryIndex >= 0 {
+               ai.Registry = as.Adaptors[registryIndex]
+       }
+       return ai
+}
diff --git a/server/plugin/pkg/discovery/servicecenter/aggregate.go 
b/server/plugin/pkg/discovery/servicecenter/aggregate.go
index 09417cd3..1cb097b0 100644
--- a/server/plugin/pkg/discovery/servicecenter/aggregate.go
+++ b/server/plugin/pkg/discovery/servicecenter/aggregate.go
@@ -53,29 +53,19 @@ func (c *SCClientAggregate) GetScCache() (*model.Cache, 
map[string]error) {
                if caches == nil {
                        caches = &model.Cache{}
                }
-
-               c.cacheAppend(client.Cfg.Name, &caches.Microservices, 
&cache.Microservices)
-               c.cacheAppend(client.Cfg.Name, &caches.Indexes, &cache.Indexes)
-               c.cacheAppend(client.Cfg.Name, &caches.Aliases, &cache.Aliases)
-               c.cacheAppend(client.Cfg.Name, &caches.Tags, &cache.Tags)
-               c.cacheAppend(client.Cfg.Name, &caches.Rules, &cache.Rules)
-               c.cacheAppend(client.Cfg.Name, &caches.RuleIndexes, 
&cache.RuleIndexes)
-               c.cacheAppend(client.Cfg.Name, &caches.DependencyRules, 
&cache.DependencyRules)
-               c.cacheAppend(client.Cfg.Name, &caches.Summaries, 
&cache.Summaries)
-               c.cacheAppend(client.Cfg.Name, &caches.Instances, 
&cache.Instances)
+               caches.Microservices = append(caches.Microservices, 
cache.Microservices...)
+               caches.Indexes = append(caches.Indexes, cache.Indexes...)
+               caches.Aliases = append(caches.Aliases, cache.Aliases...)
+               caches.Tags = append(caches.Tags, cache.Tags...)
+               caches.Rules = append(caches.Rules, cache.Rules...)
+               caches.RuleIndexes = append(caches.RuleIndexes, 
cache.RuleIndexes...)
+               caches.DependencyRules = append(caches.DependencyRules, 
cache.DependencyRules...)
+               caches.Summaries = append(caches.Summaries, cache.Summaries...)
+               caches.Instances = append(caches.Instances, cache.Instances...)
        }
        return caches, errs
 }
 
-func (c *SCClientAggregate) cacheAppend(name string, setter model.Setter, 
getter model.Getter) {
-       getter.ForEach(func(_ int, v *model.KV) bool {
-               // overwrite the cluster from remote to local
-               v.ClusterName = name
-               setter.SetValue(v)
-               return true
-       })
-}
-
 func (c *SCClientAggregate) GetSchemasByServiceId(domainProject, serviceId 
string) ([]*pb.Schema, *scerr.Error) {
        var schemas []*pb.Schema
        for _, client := range *c {
diff --git a/server/plugin/pkg/discovery/servicecenter/aggregate_test.go 
b/server/plugin/pkg/discovery/servicecenter/aggregate_test.go
new file mode 100644
index 00000000..e906c2bf
--- /dev/null
+++ b/server/plugin/pkg/discovery/servicecenter/aggregate_test.go
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package servicecenter
+
+import (
+       
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
+       "testing"
+)
+
+func TestNewSCClientAggregate(t *testing.T) {
+       registry.Configuration().ClusterAddresses = 
"sc-1=127.0.0.1:2379,127.0.0.2:2379"
+       registry.Configuration().InitClusters()
+       c := NewSCClientAggregate()
+       if len(*c) == 0 {
+               t.Fatalf("TestNewSCClientAggregate failed")
+       }
+}
diff --git a/server/plugin/pkg/discovery/servicecenter/indexer.go 
b/server/plugin/pkg/discovery/servicecenter/indexer.go
index 3af42833..746cfa34 100644
--- a/server/plugin/pkg/discovery/servicecenter/indexer.go
+++ b/server/plugin/pkg/discovery/servicecenter/indexer.go
@@ -139,6 +139,12 @@ func (c *ClusterIndexer) checkWithConflictHandleFunc(local 
*ServiceCenterCacher,
        conflictHandleFunc func(origin *model.KV, conflict model.Getter, index 
int)) {
        exists := make(map[string]*model.KV)
        remote.ForEach(func(i int, v *model.KV) bool {
+               // because the result of the remote return may contain the same 
data as
+               // the local cache of the current SC. So we need to ignore it 
and
+               // prevent the aggregation result from increasing.
+               if v.ClusterName == registry.Configuration().ClusterName {
+                       return true
+               }
                if kv, ok := exists[v.Key]; ok {
                        conflictHandleFunc(kv, remote, i)
                        return true
@@ -157,8 +163,8 @@ func (c *ClusterIndexer) checkWithConflictHandleFunc(local 
*ServiceCenterCacher,
                        newKv.CreateRevision = v.Rev
                        local.Notify(pb.EVT_CREATE, v.Key, newKv)
                case kv.ModRevision != v.Rev:
-                       // if lose some cluster kvs, then skip to notify 
changes of this cluster
-                       // to prevent publish the wrong changes events of kvs
+                       // if connect to some cluster failed, then skip to 
notify changes
+                       // of these clusters to prevent publish the wrong 
changes events of kvs.
                        if err, ok := skipClusters[kv.ClusterName]; ok {
                                log.Errorf(err, "cluster[%s] temporarily 
unavailable, skip cluster[%s] event %s %s",
                                        kv.ClusterName, v.ClusterName, 
pb.EVT_UPDATE, v.Key)
@@ -175,6 +181,9 @@ func (c *ClusterIndexer) checkWithConflictHandleFunc(local 
*ServiceCenterCacher,
        local.Cache().ForEach(func(key string, v *discovery.KeyValue) (next 
bool) {
                var exist bool
                remote.ForEach(func(_ int, v *model.KV) bool {
+                       if v.ClusterName == 
registry.Configuration().ClusterName {
+                               return true
+                       }
                        exist = v.Key == key
                        return !exist
                })
diff --git a/server/plugin/pkg/discovery/servicecenter/indexer_test.go 
b/server/plugin/pkg/discovery/servicecenter/indexer_test.go
new file mode 100644
index 00000000..6d622172
--- /dev/null
+++ b/server/plugin/pkg/discovery/servicecenter/indexer_test.go
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package servicecenter
+
+import (
+       "fmt"
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/server/admin/model"
+       "github.com/apache/servicecomb-service-center/server/core/proto"
+       
"github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
+       
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
+       "testing"
+)
+
+func TestClusterIndexer_Sync(t *testing.T) {
+       indexer := &ClusterIndexer{}
+       cache := discovery.NewKvCache("test", discovery.Configure())
+       cfg := discovery.Configure()
+       sccacher := NewServiceCenterCacher(cfg, cache)
+       arr := model.MicroserviceIndexSlice{}
+
+       // case: sync empty data
+       cfg.WithEventFunc(func(discovery.KvEvent) {
+               t.Fatalf("TestClusterIndexer_Sync failed")
+       })
+       indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, 
func(*model.KV, model.Getter, int) {
+               t.Fatalf("TestClusterIndexer_Sync failed")
+       })
+
+       // case: CREATE
+       cfg.WithEventFunc(func(evt discovery.KvEvent) {
+               if evt.Type != proto.EVT_CREATE {
+                       t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
+               }
+               fmt.Println(evt)
+       })
+       arr = model.MicroserviceIndexSlice{}
+       arr.SetValue(&model.KV{Key: "/a", Value: "a", Rev: 1, ClusterName: "a"})
+       indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, 
func(*model.KV, model.Getter, int) {
+               t.Fatalf("TestClusterIndexer_Sync failed")
+       })
+
+       // case: UPDATE
+       cfg.WithEventFunc(func(evt discovery.KvEvent) {
+               if evt.Type != proto.EVT_UPDATE {
+                       t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
+               }
+               fmt.Println(evt)
+       })
+       arr = model.MicroserviceIndexSlice{}
+       arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, ClusterName: 
"a"})
+       indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(kv 
*model.KV, _ model.Getter, _ int) {
+               t.Fatalf("TestClusterIndexer_Sync failed %v", kv)
+       })
+
+       // case: UPDATE the same one
+       cfg.WithEventFunc(func(evt discovery.KvEvent) {
+               t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
+       })
+       indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, 
func(*model.KV, model.Getter, int) {
+               t.Fatalf("TestClusterIndexer_Sync failed")
+       })
+
+       // case: conflict but not print log
+       cfg.WithEventFunc(func(evt discovery.KvEvent) {
+               t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
+       })
+       arr = model.MicroserviceIndexSlice{}
+       arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, ClusterName: 
"a"})
+       arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, ClusterName: 
"b"})
+       indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, 
indexer.logConflictFunc)
+
+       // case: conflict and print log
+       func() {
+               defer log.Recover()
+               cfg.WithEventFunc(func(evt discovery.KvEvent) {
+                       t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
+               })
+               arr = model.MicroserviceIndexSlice{}
+               arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, 
ClusterName: "a"})
+               arr.SetValue(&model.KV{Key: "/a", Value: "ab", Rev: 2, 
ClusterName: "b"})
+               indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, 
indexer.logConflictFunc)
+               // '/a' is incorrect key and logConflictFunc will be excepted 
to panic here
+               t.Fatalf("TestClusterIndexer_Sync failed")
+       }()
+
+       // case: some cluster err and do not overwrite the cache
+       cfg.WithEventFunc(func(evt discovery.KvEvent) {
+               t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
+       })
+       arr = model.MicroserviceIndexSlice{}
+       arr.SetValue(&model.KV{Key: "/a", Value: "ab", Rev: 3, ClusterName: 
"b"})
+       indexer.checkWithConflictHandleFunc(sccacher, &arr, 
map[string]error{"a": fmt.Errorf("error")}, func(kv *model.KV, _ model.Getter, 
_ int) {
+               t.Fatalf("TestClusterIndexer_Sync failed %v", kv)
+       })
+
+       // case: DELETE but the cluster err
+       cfg.WithEventFunc(func(evt discovery.KvEvent) {
+               t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
+       })
+       arr = model.MicroserviceIndexSlice{}
+       indexer.checkWithConflictHandleFunc(sccacher, &arr, 
map[string]error{"a": fmt.Errorf("error")}, func(kv *model.KV, _ model.Getter, 
_ int) {
+               t.Fatalf("TestClusterIndexer_Sync failed %v", kv)
+       })
+
+       // case: DELETE
+       cfg.WithEventFunc(func(evt discovery.KvEvent) {
+               fmt.Println(evt)
+               if evt.Type != proto.EVT_DELETE {
+                       t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
+               }
+       })
+       arr = model.MicroserviceIndexSlice{}
+       indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(kv 
*model.KV, _ model.Getter, _ int) {
+               t.Fatalf("TestClusterIndexer_Sync failed %v", kv)
+       })
+
+       // case: CREATE again and set cluster to local cluster name
+       cfg.WithEventFunc(func(evt discovery.KvEvent) {
+               if evt.Type != proto.EVT_CREATE {
+                       t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
+               }
+               fmt.Println(evt)
+       })
+       arr = model.MicroserviceIndexSlice{}
+       arr.SetValue(&model.KV{Key: "/a", Value: "a", Rev: 1, ClusterName: 
registry.Configuration().ClusterName})
+       indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, 
func(*model.KV, model.Getter, int) {
+               t.Fatalf("TestClusterIndexer_Sync failed")
+       })
+
+       // case: UPDATE but skip local cluster
+       cfg.WithEventFunc(func(evt discovery.KvEvent) {
+               if evt.Type != proto.EVT_UPDATE && evt.KV.Value != "aa" {
+                       t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
+               }
+               fmt.Println(evt)
+       })
+       arr = model.MicroserviceIndexSlice{}
+       arr.SetValue(&model.KV{Key: "/a", Value: "x", Rev: 2, ClusterName: 
registry.Configuration().ClusterName})
+       arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, ClusterName: 
"a"})
+       indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(kv 
*model.KV, _ model.Getter, _ int) {
+               t.Fatalf("TestClusterIndexer_Sync failed %v", kv)
+       })
+}
diff --git a/server/plugin/pkg/registry/common.go 
b/server/plugin/pkg/registry/common.go
index fd7efb99..6509c715 100644
--- a/server/plugin/pkg/registry/common.go
+++ b/server/plugin/pkg/registry/common.go
@@ -58,6 +58,8 @@ const (
        // the timeout dial to etcd
        defaultDialTimeout    = 10 * time.Second
        defaultRequestTimeout = 30 * time.Second
+
+       defaultClusterName = "default"
 )
 
 func WithTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
diff --git a/server/plugin/pkg/registry/config.go 
b/server/plugin/pkg/registry/config.go
index 34ab1ac8..60f1701a 100644
--- a/server/plugin/pkg/registry/config.go
+++ b/server/plugin/pkg/registry/config.go
@@ -84,7 +84,7 @@ func (c *Config) RegistryAddresses() []string {
 func Configuration() *Config {
        configOnce.Do(func() {
                var err error
-               defaultRegistryConfig.ClusterName = 
beego.AppConfig.DefaultString("manager_name", "default")
+               defaultRegistryConfig.ClusterName = 
beego.AppConfig.DefaultString("manager_name", defaultClusterName)
                defaultRegistryConfig.ManagerAddress = 
beego.AppConfig.String("manager_addr")
                defaultRegistryConfig.ClusterAddresses = 
beego.AppConfig.DefaultString("manager_cluster", "http://127.0.0.1:2379";)
                defaultRegistryConfig.InitClusters()
diff --git a/server/plugin/pkg/registry/option.go 
b/server/plugin/pkg/registry/option.go
index 2f33987b..f81a22c1 100644
--- a/server/plugin/pkg/registry/option.go
+++ b/server/plugin/pkg/registry/option.go
@@ -37,6 +37,7 @@ type PluginOp struct {
        WatchCallback WatchCallback
        Offset        int64
        Limit         int64
+       RegistryOnly  bool
 }
 
 func (op PluginOp) String() string {
@@ -87,6 +88,9 @@ func (op PluginOp) FormatUrlParams() string {
        if op.Limit > 0 {
                buf.WriteString(fmt.Sprintf("&limit=%d", op.Limit))
        }
+       if op.RegistryOnly {
+               buf.WriteString("&registryOnly=true")
+       }
        return buf.String()
 }
 
@@ -107,6 +111,7 @@ func WithPrevKv() PluginOpOption             { return 
func(op *PluginOp) { op.Pr
 func WithLease(leaseID int64) PluginOpOption { return func(op *PluginOp) { 
op.Lease = leaseID } }
 func WithKeyOnly() PluginOpOption            { return func(op *PluginOp) { 
op.KeyOnly = true } }
 func WithCountOnly() PluginOpOption          { return func(op *PluginOp) { 
op.CountOnly = true } }
+func WithRegistryOnly() PluginOpOption       { return func(op *PluginOp) { 
op.RegistryOnly = true } }
 func WithNoneOrder() PluginOpOption          { return func(op *PluginOp) { 
op.SortOrder = SORT_NONE } }
 func WithAscendOrder() PluginOpOption        { return func(op *PluginOp) { 
op.SortOrder = SORT_ASCEND } }
 func WithDescendOrder() PluginOpOption       { return func(op *PluginOp) { 
op.SortOrder = SORT_DESCEND } }
diff --git a/server/service/util/common.go b/server/service/util/common.go
index 6c25e513..0b182aa2 100644
--- a/server/service/util/common.go
+++ b/server/service/util/common.go
@@ -18,8 +18,9 @@ package util
 
 const (
        HEADER_REV            = "X-Resource-Revision"
-       CTX_NOCACHE           = "noCache"
-       CTX_CACHEONLY         = "cacheOnly"
-       CTX_REQUEST_REVISION  = "requestRev"
-       CTX_RESPONSE_REVISION = "responseRev"
+       CTX_REGISTRYONLY      = "_registryOnly"
+       CTX_NOCACHE           = "_noCache"
+       CTX_CACHEONLY         = "_cacheOnly"
+       CTX_REQUEST_REVISION  = "_requestRev"
+       CTX_RESPONSE_REVISION = "_responseRev"
 )
diff --git a/server/service/util/util.go b/server/service/util/util.go
index 9a01d0b3..2184b29d 100644
--- a/server/service/util/util.go
+++ b/server/service/util/util.go
@@ -29,5 +29,8 @@ func FromContext(ctx context.Context) 
[]registry.PluginOpOption {
        case ctx.Value(CTX_CACHEONLY) == "1":
                opts = append(opts, registry.WithCacheOnly())
        }
+       if ctx.Value(CTX_REGISTRYONLY) == "1" {
+               opts = append(opts, registry.WithRegistryOnly())
+       }
        return opts
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Bug fixes
> ---------
>
>                 Key: SCB-993
>                 URL: https://issues.apache.org/jira/browse/SCB-993
>             Project: Apache ServiceComb
>          Issue Type: Bug
>          Components: Service-Center
>            Reporter: little-cui
>            Assignee: little-cui
>            Priority: Major
>             Fix For: service-center-1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to