This is an automated email from the ASF dual-hosted git repository.

asifdxtreme pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fe59ed  SCB-953 Support sync distinct Kubernetes service types to 
service-center (#458)
3fe59ed is described below

commit 3fe59edb7c6de5c58fb19aeb7b29a1158124e03e
Author: little-cui <[email protected]>
AuthorDate: Thu Oct 18 17:02:06 2018 +0800

    SCB-953 Support sync distinct Kubernetes service types to service-center 
(#458)
    
    * Bug fixes
    
    * Should remove the instances when the register service annotation set false
    
    * Update docs
    
    * remove the duplicate cache
    
    * Parse multiple endpoints from one registry
    
    * SC Aggregate support enable TLS
---
 README.md                                          |  10 +++--
 docs/README.md                                     |   2 +-
 docs/helm.md                                       |  20 ++--------
 docs/multidcs.md                                   |   9 +++++
 docs/multidcs2.PNG                                 | Bin 0 -> 109260 bytes
 .../k8s/service-center/templates/configmap.yaml    |   3 +-
 .../k8s/service-center/templates/deployment.yaml   |   3 +-
 .../k8s/service-center/templates/service.yaml      |   3 +-
 server/admin/service.go                            |   5 +++
 server/plugin/pkg/discovery/aggregate/adaptor.go   |  14 ++++++-
 .../pkg/discovery/k8s/adaptor/cacher_index.go      |  13 +++---
 .../pkg/discovery/k8s/adaptor/cacher_instance.go   |  44 +++++++++++----------
 .../pkg/discovery/k8s/adaptor/cacher_service.go    |  16 +++++---
 server/plugin/pkg/discovery/k8s/adaptor/common.go  |  16 ++++++++
 .../pkg/discovery/k8s/adaptor/kube_client.go       |  13 ++++++
 .../pkg/discovery/servicecenter/aggregate.go       |  22 +++++++++++
 .../plugin/pkg/discovery/servicecenter/cluster.go  |   6 ++-
 server/plugin/pkg/registry/config.go               |  36 ++++++++++++-----
 server/plugin/pkg/registry/etcd/etcd.go            |   2 +-
 server/plugin/pkg/registry/etcd/etcd_test.go       |  44 +++++++++++++++++++++
 20 files changed, 210 insertions(+), 71 deletions(-)

diff --git a/README.md b/README.md
index e2fd849..ecdf7d9 100644
--- a/README.md
+++ b/README.md
@@ -7,7 +7,7 @@ Apache ServiceComb (incubating) Service-Center is a Restful 
based service-regist
  - **[`Open API`](/server/core/swagger/v4.yaml)**: API doc(Open API format) 
management for microservice
  - **Metadata**: Metadata management for both microservice and microservice 
instance
  - **Dependency**: Microservice dependency management
- - **Seperated**: Seperated microservice and microservice instance entity 
management
+ - **Separated**: Separated microservice and microservice instance entity 
management
  - **Domains**: Logical multiple domains management
  - **Security**: White and back list configuration for service discovery
  - **Discovery**: Support query instance by criteria 
@@ -17,8 +17,12 @@ Apache ServiceComb (incubating) Service-Center is a Restful 
based service-regist
  - **Performance**: Performance/Caching design
  - **[`Metrics`](/docs/integration-grafana.md)**: Able to expose Prometheus 
metric API automatically
  - **[`Tracing`](/docs/tracing.md)**: Able to report tracing data to Zipkin 
server
- - **[`Multi Datacenter`](/docs/multidcs.md)**: Additional layer of 
abstraction to clusters deployed in multiple datacenters
- - **[`Dynmaic Plug-in`](/docs/plugin.md)**: Able to load custom 
authentication, tls and other dynamic libraries
+ - **[`Pluginable`](/docs/plugin.md)**: Able to load custom authentication, 
tls and other dynamic libraries
+ - **[`CLI`](/scctl/pkg/plugin/README.md)**: Easy to control service center
+ - **[`Kubernetes`](/docs/kubeclusters.md)**: Embrace kubernetes ecosystem and 
support multi cluster service discovery
+ - **[`Datacenters`](/docs/multidcs.md)**: Additional layer of abstraction to 
clusters deployed in multiple datacenters
+ - **[`Aggregation`](/docs/aggregate.md)**: Able to aggregate microservices 
from multiple registry platforms and
+    support platform registry and client side registry at the same time
  
 ## Documentation
 
diff --git a/docs/README.md b/docs/README.md
index c827fb4..0d9d719 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -30,7 +30,7 @@ Swagger 
[`v4`](/server/core/swagger/v4.yaml)|[`v3`](/server/core/swagger/v3.yaml
 + **Kubernetes**
   - [Helm Chart](/docs/helm.md)
   - [Access Distinct Clusters](/docs/kubeclusters.md)
-  - [Multiple Platforms Micro-services Manage](/docs/aggregate.md)
+  - [Multiple Registry Aggregation](/docs/aggregate.md)
 
 #### Monitoring Service-Center
 
diff --git a/docs/helm.md b/docs/helm.md
index 9c04507..405ea11 100644
--- a/docs/helm.md
+++ b/docs/helm.md
@@ -72,25 +72,11 @@ cd ${PROJECT_ROOT}/examples/infrastructures/k8s
 # install etcd cluster
 helm install --name coreos --namespace default etcd/
 # install sc cluster
-helm install --name dc1 --namespace default \
-    --set frontend.deployment=false \
-    --set sc.image.repository="desktop-0028:5000/servicecomb/service-center" \
-    --set sc.discovery.type="aggregate" \
-    --set sc.discovery.aggregate="etcd\,servicecenter" \
-    --set 
sc.discovery.clusters="sc-1=http://c1-etcd-client:2379\,sc-2=http://dc2-service-center:30100";
 \
-    --set sc.registry.enabled=true \
-    --set sc.registry.type="etcd" \
-    --set sc.registry.name="sc-1" \
-    service-center/
-helm install --name dc2 --namespace default \
-    --set frontend.deployment=false \
-    --set sc.image.repository="desktop-0028:5000/servicecomb/service-center" \
-    --set sc.discovery.type="aggregate" \
-    --set sc.discovery.aggregate="etcd\,servicecenter" \
-    --set 
sc.discovery.clusters="sc-2=http://c2-etcd-client:2379\,sc-1=http://dc1-service-center:30100";
 \
+helm install --name servicecomb --namespace default \
+    --set sc.discovery.type="etcd" \
+    --set sc.discovery.clusters="http://coreos-etcd-client:2379"; \
     --set sc.registry.enabled=true \
     --set sc.registry.type="etcd" \
-    --set sc.registry.name="sc-2" \
     service-center/
 ```
 
diff --git a/docs/multidcs.md b/docs/multidcs.md
index ecba0ea..3354ff6 100644
--- a/docs/multidcs.md
+++ b/docs/multidcs.md
@@ -14,6 +14,15 @@ isolated from each other. Another implementation of the 
discovery plug-in, `Serv
 access multiple SC instances and periodically pull up micro-service instance 
information so that if some
 micro-services can request aggregate, cross-DCs can be implemented using the 
same API as SC cluster.
 
+If SC aggregate is not deployed globally, SC also supports another way to 
implement multiple DCs discovery,
+as shown below.
+
+![architecture](/docs/multidcs2.PNG)
+
+The difference between the two approaches is that global deployment aggregate 
can divert service discovery traffic,
+the whole architecture is more like a read-write separation architecture, and 
the SC of each DC manage microservice
+information independently, which reduces the complexity. So we recommend the 
first architecture.
+
 ## Quick Start
 
 Let's assume you want to install 2 clusters of Service-Center in different DCs 
with following details.
diff --git a/docs/multidcs2.PNG b/docs/multidcs2.PNG
new file mode 100644
index 0000000..1190e4c
Binary files /dev/null and b/docs/multidcs2.PNG differ
diff --git 
a/examples/infrastructures/k8s/service-center/templates/configmap.yaml 
b/examples/infrastructures/k8s/service-center/templates/configmap.yaml
index 1a2c72d..56820b8 100644
--- a/examples/infrastructures/k8s/service-center/templates/configmap.yaml
+++ b/examples/infrastructures/k8s/service-center/templates/configmap.yaml
@@ -35,8 +35,7 @@ data:
     ssl_mode = 0
     enable_pprof = 1
 {{- end }}
-
-{{- if .Values.frontend.deployment -}}
+{{- if .Values.frontend.deployment }}
 ---
 apiVersion: v1
 kind: ConfigMap
diff --git 
a/examples/infrastructures/k8s/service-center/templates/deployment.yaml 
b/examples/infrastructures/k8s/service-center/templates/deployment.yaml
index 8bf7418..ddaf8f5 100644
--- a/examples/infrastructures/k8s/service-center/templates/deployment.yaml
+++ b/examples/infrastructures/k8s/service-center/templates/deployment.yaml
@@ -53,8 +53,7 @@ spec:
 {{ toYaml .Values.sc.nodeSelector | indent 8 }}
     {{- end }}
 {{- end }}
-
-{{- if .Values.frontend.deployment -}}
+{{- if .Values.frontend.deployment }}
 ---
 apiVersion: extensions/v1beta1
 kind: Deployment
diff --git a/examples/infrastructures/k8s/service-center/templates/service.yaml 
b/examples/infrastructures/k8s/service-center/templates/service.yaml
index 078b6df..f835ef6 100644
--- a/examples/infrastructures/k8s/service-center/templates/service.yaml
+++ b/examples/infrastructures/k8s/service-center/templates/service.yaml
@@ -24,8 +24,7 @@ spec:
     app: {{ template "service-center.name" . }}
     release: {{ .Release.Name }}
 {{- end }}
-
-{{- if .Values.frontend.deployment -}}
+{{- if .Values.frontend.deployment }}
 ---
 apiVersion: v1
 kind: Service
diff --git a/server/admin/service.go b/server/admin/service.go
index 2a86d80..6255f38 100644
--- a/server/admin/service.go
+++ b/server/admin/service.go
@@ -95,7 +95,12 @@ func (service *AdminService) dumpAll(ctx context.Context, 
cache *model.Cache) {
 }
 
 func setValue(e discovery.Adaptor, setter model.Setter) {
+       exists := make(map[string]struct{})
        e.Cache().ForEach(func(k string, kv *discovery.KeyValue) (next bool) {
+               if _, ok := exists[k]; ok {
+                       return true
+               }
+               exists[k] = struct{}{}
                setter.SetValue(&model.KV{Key: k, Rev: kv.ModRevision, Value: 
kv.Value})
                return true
        })
diff --git a/server/plugin/pkg/discovery/aggregate/adaptor.go 
b/server/plugin/pkg/discovery/aggregate/adaptor.go
index 7196d11..cb66980 100644
--- a/server/plugin/pkg/discovery/aggregate/adaptor.go
+++ b/server/plugin/pkg/discovery/aggregate/adaptor.go
@@ -16,6 +16,7 @@
 package aggregate
 
 import (
+       "github.com/apache/incubator-servicecomb-service-center/pkg/util"
        mgr 
"github.com/apache/incubator-servicecomb-service-center/server/plugin"
        
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/discovery"
        
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/registry"
@@ -26,13 +27,22 @@ import (
 type Aggregator []discovery.Adaptor
 
 func (as Aggregator) Search(ctx context.Context, opts 
...registry.PluginOpOption) (*discovery.Response, error) {
-       var response discovery.Response
+       var (
+               response discovery.Response
+               exists   = make(map[string]struct{})
+       )
        for _, a := range as {
                resp, err := a.Search(ctx, opts...)
                if err != nil {
                        continue
                }
-               response.Kvs = append(response.Kvs, resp.Kvs...)
+               for _, kv := range resp.Kvs {
+                       key := util.BytesToStringWithNoCopy(kv.Key)
+                       if _, ok := exists[key]; !ok {
+                               exists[key] = struct{}{}
+                               response.Kvs = append(response.Kvs, kv)
+                       }
+               }
                response.Count += resp.Count
        }
        return &response, nil
diff --git a/server/plugin/pkg/discovery/k8s/adaptor/cacher_index.go 
b/server/plugin/pkg/discovery/k8s/adaptor/cacher_index.go
index 2f0651c..b03a637 100644
--- a/server/plugin/pkg/discovery/k8s/adaptor/cacher_index.go
+++ b/server/plugin/pkg/discovery/k8s/adaptor/cacher_index.go
@@ -20,7 +20,6 @@ import (
        pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
        
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/discovery"
        "k8s.io/api/core/v1"
-       meta "k8s.io/apimachinery/pkg/apis/meta/v1"
 )
 
 type ServiceIndexCacher struct {
@@ -30,14 +29,18 @@ type ServiceIndexCacher struct {
 // onServiceEvent is the method to refresh service cache
 func (c *ServiceIndexCacher) onServiceEvent(evt K8sEvent) {
        svc := evt.Object.(*v1.Service)
-       if svc.Namespace == meta.NamespaceSystem {
-               return
-       }
-
        domainProject := Kubernetes().GetDomainProject()
        serviceId := string(svc.UID)
        indexKey := 
core.GenerateServiceIndexKey(generateServiceKey(domainProject, svc))
 
+       if !ShouldRegisterService(svc) {
+               kv := c.Cache().Get(indexKey)
+               if kv != nil {
+                       c.Notify(pb.EVT_DELETE, indexKey, kv)
+               }
+               return
+       }
+
        switch evt.EventType {
        case pb.EVT_CREATE:
                kv := AsKeyValue(indexKey, serviceId, svc.ResourceVersion)
diff --git a/server/plugin/pkg/discovery/k8s/adaptor/cacher_instance.go 
b/server/plugin/pkg/discovery/k8s/adaptor/cacher_instance.go
index 852c69e..55c0f3e 100644
--- a/server/plugin/pkg/discovery/k8s/adaptor/cacher_instance.go
+++ b/server/plugin/pkg/discovery/k8s/adaptor/cacher_instance.go
@@ -21,7 +21,6 @@ import (
        pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
        
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/discovery"
        "k8s.io/api/core/v1"
-       meta "k8s.io/apimachinery/pkg/apis/meta/v1"
        "reflect"
        "strconv"
 )
@@ -33,29 +32,25 @@ type InstanceCacher struct {
 // onServiceEvent is the method to refresh service cache
 func (c *InstanceCacher) onServiceEvent(evt K8sEvent) {
        svc := evt.Object.(*v1.Service)
-       if svc.Namespace == meta.NamespaceSystem {
-               return
-       }
-
        domainProject := Kubernetes().GetDomainProject()
        serviceId := string(svc.UID)
-       instKey := core.GenerateInstanceKey(domainProject, serviceId, "")
 
        switch evt.EventType {
        case pb.EVT_DELETE:
-               // instances
-               var kvs []*discovery.KeyValue
-               c.Cache().GetPrefix(instKey, &kvs)
-               for _, kv := range kvs {
-                       key := util.BytesToStringWithNoCopy(kv.Key)
-                       c.Notify(pb.EVT_DELETE, key, kv)
+               c.deleteInstances(domainProject, serviceId)
+       case pb.EVT_UPDATE:
+               if !ShouldRegisterService(svc) {
+                       c.deleteInstances(domainProject, serviceId)
+                       return
                }
+               ep := Kubernetes().GetEndpoints(svc.Namespace, svc.Name)
+               c.onEndpointsEvent(K8sEvent{pb.EVT_CREATE, ep, nil})
        }
 }
 
-func (c *InstanceCacher) getInstances(serviceId string) (m 
map[string]*discovery.KeyValue) {
+func (c *InstanceCacher) getInstances(domainProject, serviceId string) (m 
map[string]*discovery.KeyValue) {
        var arr []*discovery.KeyValue
-       key := core.GenerateInstanceKey(Kubernetes().GetDomainProject(), 
serviceId, "")
+       key := core.GenerateInstanceKey(domainProject, serviceId, "")
        if l := c.Cache().GetPrefix(key, &arr); l > 0 {
                m = make(map[string]*discovery.KeyValue, l)
                for _, kv := range arr {
@@ -65,20 +60,26 @@ func (c *InstanceCacher) getInstances(serviceId string) (m 
map[string]*discovery
        return
 }
 
+func (c *InstanceCacher) deleteInstances(domainProject, serviceId string) {
+       var kvs []*discovery.KeyValue
+       c.Cache().GetPrefix(core.GenerateInstanceKey(domainProject, serviceId, 
""), &kvs)
+       for _, kv := range kvs {
+               key := util.BytesToStringWithNoCopy(kv.Key)
+               c.Notify(pb.EVT_DELETE, key, kv)
+       }
+}
+
 // onEndpointsEvent is the method to refresh instance cache
 func (c *InstanceCacher) onEndpointsEvent(evt K8sEvent) {
        ep := evt.Object.(*v1.Endpoints)
-       if ep.Namespace == meta.NamespaceSystem {
-               return
-       }
-
        svc := Kubernetes().GetService(ep.Namespace, ep.Name)
-       if svc == nil {
+       if svc == nil || !ShouldRegisterService(svc) {
                return
        }
 
        serviceId := string(svc.UID)
-       oldKvs := c.getInstances(serviceId)
+       domainProject := Kubernetes().GetDomainProject()
+       oldKvs := c.getInstances(domainProject, serviceId)
        newKvs := make(map[string]*discovery.KeyValue)
        for _, ss := range ep.Subsets {
                for _, ea := range ss.Addresses {
@@ -108,6 +109,9 @@ func (c *InstanceCacher) onEndpointsEvent(evt K8sEvent) {
                                        DataCenterInfo: &pb.DataCenterInfo{},
                                        Timestamp:      
strconv.FormatInt(pod.CreationTimestamp.Unix(), 10),
                                        Version:        getLabel(svc.Labels, 
LabelVersion, pb.VERSION),
+                                       Properties: map[string]string{
+                                               PropNodeIP: pod.Status.HostIP,
+                                       },
                                }
                                inst.DataCenterInfo.Region, 
inst.DataCenterInfo.AvailableZone = getRegionAZ(node)
                                inst.ModTimestamp = inst.Timestamp
diff --git a/server/plugin/pkg/discovery/k8s/adaptor/cacher_service.go 
b/server/plugin/pkg/discovery/k8s/adaptor/cacher_service.go
index c1455ba..cb5fcf0 100644
--- a/server/plugin/pkg/discovery/k8s/adaptor/cacher_service.go
+++ b/server/plugin/pkg/discovery/k8s/adaptor/cacher_service.go
@@ -20,7 +20,6 @@ import (
        pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
        
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/discovery"
        "k8s.io/api/core/v1"
-       meta "k8s.io/apimachinery/pkg/apis/meta/v1"
 )
 
 type ServiceCacher struct {
@@ -30,18 +29,25 @@ type ServiceCacher struct {
 // onServiceEvent is the method to refresh service cache
 func (c *ServiceCacher) onServiceEvent(evt K8sEvent) {
        svc := evt.Object.(*v1.Service)
-       if svc.Namespace == meta.NamespaceSystem {
-               return
-       }
-
        domainProject := Kubernetes().GetDomainProject()
        serviceId := string(svc.UID)
        key := core.GenerateServiceKey(domainProject, serviceId)
 
+       if !ShouldRegisterService(svc) {
+               kv := c.Cache().Get(key)
+               if kv != nil {
+                       c.Notify(pb.EVT_DELETE, key, kv)
+               }
+               return
+       }
+
        switch evt.EventType {
        case pb.EVT_CREATE, pb.EVT_UPDATE:
                ms := FromK8sService(svc)
                kv := AsKeyValue(key, ms, svc.ResourceVersion)
+               if c.Cache().Get(key) == nil {
+                       evt.EventType = pb.EVT_CREATE
+               }
                c.Notify(evt.EventType, key, kv)
        case pb.EVT_DELETE:
                // service
diff --git a/server/plugin/pkg/discovery/k8s/adaptor/common.go 
b/server/plugin/pkg/discovery/k8s/adaptor/common.go
index ed9a88f..cf67a62 100644
--- a/server/plugin/pkg/discovery/k8s/adaptor/common.go
+++ b/server/plugin/pkg/discovery/k8s/adaptor/common.go
@@ -18,6 +18,8 @@ package adaptor
 import (
        "github.com/apache/incubator-servicecomb-service-center/pkg/queue"
        "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+       "k8s.io/api/core/v1"
+       meta "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/tools/clientcmd"
        "time"
@@ -44,10 +46,14 @@ const (
        LabelNodeRegion  = "failure-domain.beta.kubernetes.io/region"
        LabelNodeAZ      = "failure-domain.beta.kubernetes.io/zone"
 
+       // register annotations
+       AnnotationRegister = "service-center.servicecomb.io/register"
+
        // properties
        PropNamespace    = "namespace"
        PropExternalName = "externalName"
        PropServiceType  = "type"
+       PropNodeIP       = "nodeIP"
 
        minWaitInterval       = 5 * time.Second
        defaultResyncInterval = 60 * time.Second
@@ -79,3 +85,13 @@ func Queue(t K8sType) *queue.TaskQueue {
        })
        return q.(*queue.TaskQueue)
 }
+
+func ShouldRegisterService(service *v1.Service) bool {
+       if service.Namespace == meta.NamespaceSystem {
+               return false
+       }
+       if register, ok := service.ObjectMeta.Annotations[AnnotationRegister]; 
ok && register == "false" {
+               return false
+       }
+       return true
+}
diff --git a/server/plugin/pkg/discovery/k8s/adaptor/kube_client.go 
b/server/plugin/pkg/discovery/k8s/adaptor/kube_client.go
index 422d9a9..b1422d0 100644
--- a/server/plugin/pkg/discovery/k8s/adaptor/kube_client.go
+++ b/server/plugin/pkg/discovery/k8s/adaptor/kube_client.go
@@ -178,6 +178,19 @@ func (c *K8sClient) GetService(namespace, name string) 
(svc *v1.Service) {
        return
 }
 
+func (c *K8sClient) GetEndpoints(namespace, name string) (ep *v1.Endpoints) {
+       obj, ok, err := 
c.Endpoints().GetStore().GetByKey(getFullName(namespace, name))
+       if err != nil {
+               log.Errorf(err, "get k8s endpoints[%s/%s] failed", namespace, 
name)
+               return
+       }
+       if !ok {
+               return
+       }
+       ep = obj.(*v1.Endpoints)
+       return
+}
+
 func (c *K8sClient) GetPodByIP(ip string) (pod *v1.Pod) {
        itf, ok := c.ipIndex.Get(ip)
        if !ok {
diff --git a/server/plugin/pkg/discovery/servicecenter/aggregate.go 
b/server/plugin/pkg/discovery/servicecenter/aggregate.go
index fd14ad6..fb3e818 100644
--- a/server/plugin/pkg/discovery/servicecenter/aggregate.go
+++ b/server/plugin/pkg/discovery/servicecenter/aggregate.go
@@ -16,16 +16,30 @@
 package servicecenter
 
 import (
+       "crypto/tls"
        "github.com/apache/incubator-servicecomb-service-center/pkg/client/sc"
        "github.com/apache/incubator-servicecomb-service-center/pkg/log"
        
"github.com/apache/incubator-servicecomb-service-center/server/admin/model"
        pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
        scerr 
"github.com/apache/incubator-servicecomb-service-center/server/error"
+       mgr 
"github.com/apache/incubator-servicecomb-service-center/server/plugin"
        
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/registry"
+       "strings"
 )
 
 type SCClientAggregate []*sc.SCClient
 
+var clientTLS *tls.Config
+
+func getClientTLS() (*tls.Config, error) {
+       if clientTLS != nil {
+               return clientTLS, nil
+       }
+       var err error
+       clientTLS, err = mgr.Plugins().TLS().ClientConfig()
+       return clientTLS, err
+}
+
 func (c *SCClientAggregate) GetScCache() (*model.Cache, error) {
        var caches model.Cache
        for _, client := range *c {
@@ -90,6 +104,14 @@ func NewSCClientAggregate() *SCClientAggregate {
                        continue
                }
                client.Timeout = registry.Configuration().RequestTimeOut
+               if strings.Index(addr[0], "https") >= 0 {
+                       client.TLS, err = getClientTLS()
+                       if err != nil {
+                               log.Errorf(err, "get service center[%s][%s] tls 
config failed", name, addr)
+                               continue
+                       }
+               }
+
                *c = append(*c, client)
                log.Infof("new service center[%s][%s] client", name, addr)
        }
diff --git a/server/plugin/pkg/discovery/servicecenter/cluster.go 
b/server/plugin/pkg/discovery/servicecenter/cluster.go
index 5c0bdd0..e6f4be9 100644
--- a/server/plugin/pkg/discovery/servicecenter/cluster.go
+++ b/server/plugin/pkg/discovery/servicecenter/cluster.go
@@ -93,7 +93,6 @@ func (c *ServiceCenterCluster) Sync(ctx context.Context) 
error {
        }
 
        // microservice
-       // TODO remove duplicate SERVICECENTER
        serviceCacher, ok := c.cachers[backend.SERVICE]
        if ok {
                c.check(serviceCacher, &cache.Microservices)
@@ -132,7 +131,12 @@ func (c *ServiceCenterCluster) Sync(ctx context.Context) 
error {
 }
 
 func (c *ServiceCenterCluster) check(local *ServiceCenterCacher, remote 
model.Getter) {
+       exists := make(map[string]struct{})
        remote.ForEach(func(_ int, v *model.KV) bool {
+               if _, ok := exists[v.Key]; ok {
+                       return true
+               }
+               exists[v.Key] = struct{}{}
                kv := local.Cache().Get(v.Key)
                newKv := &discovery.KeyValue{
                        Key:            util.StringToBytesWithNoCopy(v.Key),
diff --git a/server/plugin/pkg/registry/config.go 
b/server/plugin/pkg/registry/config.go
index fbfa9f3..b75cfa4 100644
--- a/server/plugin/pkg/registry/config.go
+++ b/server/plugin/pkg/registry/config.go
@@ -43,23 +43,39 @@ type Config struct {
 
 func (c *Config) InitClusters() {
        c.Clusters = make(Clusters)
-       kvs := strings.Split(c.ClusterAddresses, ",")
-       for _, cluster := range kvs {
-               // sc-0=http(s)://host1:port1|http(s)://host2:port2
-               names := strings.Split(cluster, "=")
-               if len(names) != 2 {
-                       continue
+       // 
sc-0=http(s)://host1:port1,http(s)://host2:port2,sc-1=http(s)://host3:port3
+       kvs := strings.Split(c.ClusterAddresses, "=")
+       if l := len(kvs); l >= 2 {
+               var (
+                       names []string
+                       addrs [][]string
+               )
+               for i := 0; i < l; i++ {
+                       ss := strings.Split(kvs[i], ",")
+                       sl := len(ss)
+                       if i != l-1 {
+                               names = append(names, ss[sl-1])
+                       }
+                       if i != 0 {
+                               if sl > 1 && i != l-1 {
+                                       addrs = append(addrs, ss[:sl-1])
+                               } else {
+                                       addrs = append(addrs, ss)
+                               }
+                       }
+               }
+               for i, name := range names {
+                       c.Clusters[name] = addrs[i]
                }
-               c.Clusters[names[0]] = strings.Split(names[1], "|")
        }
        if len(c.Clusters) == 0 {
                c.Clusters[c.ClusterName] = []string{c.ClusterAddresses}
        }
 }
 
-// ClusterAddress return the address of current SC's registry backend
-func (c *Config) ClusterAddress() string {
-       return c.Clusters[c.ClusterName][0]
+// RegistryAddresses return the address of current SC's registry backend
+func (c *Config) RegistryAddresses() []string {
+       return c.Clusters[c.ClusterName]
 }
 
 func Configuration() *Config {
diff --git a/server/plugin/pkg/registry/etcd/etcd.go 
b/server/plugin/pkg/registry/etcd/etcd.go
index c940828..0dd4f83 100644
--- a/server/plugin/pkg/registry/etcd/etcd.go
+++ b/server/plugin/pkg/registry/etcd/etcd.go
@@ -751,7 +751,7 @@ func (c *EtcdClient) ReOpen() error {
 
 func (c *EtcdClient) parseEndpoints() {
        // use the default cluster endpoints
-       addrs := strings.Split(registry.Configuration().ClusterAddress(), ",")
+       addrs := registry.Configuration().RegistryAddresses()
 
        endpoints := make([]string, 0, len(addrs))
        for _, addr := range addrs {
diff --git a/server/plugin/pkg/registry/etcd/etcd_test.go 
b/server/plugin/pkg/registry/etcd/etcd_test.go
index 5fc3fa6..f3856b4 100644
--- a/server/plugin/pkg/registry/etcd/etcd_test.go
+++ b/server/plugin/pkg/registry/etcd/etcd_test.go
@@ -29,6 +29,7 @@ import (
        "net/http"
        "os"
        "strconv"
+       "strings"
        "sync"
        "testing"
        "time"
@@ -42,6 +43,49 @@ var (
        endpoint = registry.Configuration().ClusterAddresses
 )
 
+func TestInitCluster(t *testing.T) {
+       registry.Configuration().ClusterAddresses = "127.0.0.1:2379"
+       registry.Configuration().InitClusters()
+       if strings.Join(registry.Configuration().RegistryAddresses(), ",") != 
"127.0.0.1:2379" {
+               t.Fatalf("TestInitCluster failed, %v", 
registry.Configuration().RegistryAddresses())
+       }
+       registry.Configuration().ClusterAddresses = 
"127.0.0.1:2379,127.0.0.2:2379"
+       registry.Configuration().InitClusters()
+       if strings.Join(registry.Configuration().RegistryAddresses(), ",") != 
"127.0.0.1:2379,127.0.0.2:2379" {
+               t.Fatalf("TestInitCluster failed, %v", 
registry.Configuration().RegistryAddresses())
+       }
+       registry.Configuration().ClusterName = "sc-0"
+       registry.Configuration().ClusterAddresses = 
"sc-0=127.0.0.1:2379,127.0.0.2:2379"
+       registry.Configuration().InitClusters()
+       if strings.Join(registry.Configuration().RegistryAddresses(), ",") != 
"127.0.0.1:2379,127.0.0.2:2379" {
+               t.Fatalf("TestInitCluster failed, %v", 
registry.Configuration().RegistryAddresses())
+       }
+       if strings.Join(registry.Configuration().Clusters["sc-0"], ",") != 
"127.0.0.1:2379,127.0.0.2:2379" {
+               t.Fatalf("TestInitCluster failed, %v", 
registry.Configuration().RegistryAddresses())
+       }
+       registry.Configuration().ClusterName = "sc-0"
+       registry.Configuration().ClusterAddresses = 
"sc-1=127.0.0.1:2379,127.0.0.2:2379,sc-2=127.0.0.3:2379"
+       registry.Configuration().InitClusters()
+       if strings.Join(registry.Configuration().RegistryAddresses(), ",") != 
"" {
+               t.Fatalf("TestInitCluster failed, %v", 
registry.Configuration().RegistryAddresses())
+       }
+       if strings.Join(registry.Configuration().Clusters["sc-1"], ",") != 
"127.0.0.1:2379,127.0.0.2:2379" {
+               t.Fatalf("TestInitCluster failed, %v", 
registry.Configuration().RegistryAddresses())
+       }
+       if strings.Join(registry.Configuration().Clusters["sc-2"], ",") != 
"127.0.0.3:2379" {
+               t.Fatalf("TestInitCluster failed, %v", 
registry.Configuration().RegistryAddresses())
+       }
+       registry.Configuration().ClusterName = "sc-0"
+       registry.Configuration().ClusterAddresses = 
"sc-0=127.0.0.1:2379,sc-1=127.0.0.3:2379,127.0.0.4:2379"
+       registry.Configuration().InitClusters()
+       if strings.Join(registry.Configuration().RegistryAddresses(), ",") != 
"127.0.0.1:2379" {
+               t.Fatalf("TestInitCluster failed, %v", 
registry.Configuration().RegistryAddresses())
+       }
+       if strings.Join(registry.Configuration().Clusters["sc-1"], ",") != 
"127.0.0.3:2379,127.0.0.4:2379" {
+               t.Fatalf("TestInitCluster failed, %v", 
registry.Configuration().RegistryAddresses())
+       }
+}
+
 func TestEtcdClient(t *testing.T) {
        etcd := &EtcdClient{
                Endpoints:   []string{endpoint},

Reply via email to