This is an automated email from the ASF dual-hosted git repository.
asifdxtreme pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 3fe59ed SCB-953 Support sync distinct Kubernetes service types to
service-center (#458)
3fe59ed is described below
commit 3fe59edb7c6de5c58fb19aeb7b29a1158124e03e
Author: little-cui <[email protected]>
AuthorDate: Thu Oct 18 17:02:06 2018 +0800
SCB-953 Support sync distinct Kubernetes service types to service-center
(#458)
* Bug fixes
* Should remove the instances when the register service annotation set false
* Update docs
* remove the duplicate cache
* Parse multiple endpoints from one registry
* SC Aggregate support enable TLS
---
README.md | 10 +++--
docs/README.md | 2 +-
docs/helm.md | 20 ++--------
docs/multidcs.md | 9 +++++
docs/multidcs2.PNG | Bin 0 -> 109260 bytes
.../k8s/service-center/templates/configmap.yaml | 3 +-
.../k8s/service-center/templates/deployment.yaml | 3 +-
.../k8s/service-center/templates/service.yaml | 3 +-
server/admin/service.go | 5 +++
server/plugin/pkg/discovery/aggregate/adaptor.go | 14 ++++++-
.../pkg/discovery/k8s/adaptor/cacher_index.go | 13 +++---
.../pkg/discovery/k8s/adaptor/cacher_instance.go | 44 +++++++++++----------
.../pkg/discovery/k8s/adaptor/cacher_service.go | 16 +++++---
server/plugin/pkg/discovery/k8s/adaptor/common.go | 16 ++++++++
.../pkg/discovery/k8s/adaptor/kube_client.go | 13 ++++++
.../pkg/discovery/servicecenter/aggregate.go | 22 +++++++++++
.../plugin/pkg/discovery/servicecenter/cluster.go | 6 ++-
server/plugin/pkg/registry/config.go | 36 ++++++++++++-----
server/plugin/pkg/registry/etcd/etcd.go | 2 +-
server/plugin/pkg/registry/etcd/etcd_test.go | 44 +++++++++++++++++++++
20 files changed, 210 insertions(+), 71 deletions(-)
diff --git a/README.md b/README.md
index e2fd849..ecdf7d9 100644
--- a/README.md
+++ b/README.md
@@ -7,7 +7,7 @@ Apache ServiceComb (incubating) Service-Center is a Restful
based service-regist
- **[`Open API`](/server/core/swagger/v4.yaml)**: API doc(Open API format)
management for microservice
- **Metadata**: Metadata management for both microservice and microservice
instance
- **Dependency**: Microservice dependency management
- - **Seperated**: Seperated microservice and microservice instance entity
management
+ - **Separated**: Separated microservice and microservice instance entity
management
- **Domains**: Logical multiple domains management
- **Security**: White and back list configuration for service discovery
- **Discovery**: Support query instance by criteria
@@ -17,8 +17,12 @@ Apache ServiceComb (incubating) Service-Center is a Restful
based service-regist
- **Performance**: Performance/Caching design
- **[`Metrics`](/docs/integration-grafana.md)**: Able to expose Prometheus
metric API automatically
- **[`Tracing`](/docs/tracing.md)**: Able to report tracing data to Zipkin
server
- - **[`Multi Datacenter`](/docs/multidcs.md)**: Additional layer of
abstraction to clusters deployed in multiple datacenters
- - **[`Dynmaic Plug-in`](/docs/plugin.md)**: Able to load custom
authentication, tls and other dynamic libraries
+ - **[`Pluginable`](/docs/plugin.md)**: Able to load custom authentication,
tls and other dynamic libraries
+ - **[`CLI`](/scctl/pkg/plugin/README.md)**: Easy to control service center
+ - **[`Kubernetes`](/docs/kubeclusters.md)**: Embrace kubernetes ecosystem and
support multi cluster service discovery
+ - **[`Datacenters`](/docs/multidcs.md)**: Additional layer of abstraction to
clusters deployed in multiple datacenters
+ - **[`Aggregation`](/docs/aggregate.md)**: Able to aggregate microservices
from multiple registry platforms and
+ support platform registry and client side registry at the same time
## Documentation
diff --git a/docs/README.md b/docs/README.md
index c827fb4..0d9d719 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -30,7 +30,7 @@ Swagger
[`v4`](/server/core/swagger/v4.yaml)|[`v3`](/server/core/swagger/v3.yaml
+ **Kubernetes**
- [Helm Chart](/docs/helm.md)
- [Access Distinct Clusters](/docs/kubeclusters.md)
- - [Multiple Platforms Micro-services Manage](/docs/aggregate.md)
+ - [Multiple Registry Aggregation](/docs/aggregate.md)
#### Monitoring Service-Center
diff --git a/docs/helm.md b/docs/helm.md
index 9c04507..405ea11 100644
--- a/docs/helm.md
+++ b/docs/helm.md
@@ -72,25 +72,11 @@ cd ${PROJECT_ROOT}/examples/infrastructures/k8s
# install etcd cluster
helm install --name coreos --namespace default etcd/
# install sc cluster
-helm install --name dc1 --namespace default \
- --set frontend.deployment=false \
- --set sc.image.repository="desktop-0028:5000/servicecomb/service-center" \
- --set sc.discovery.type="aggregate" \
- --set sc.discovery.aggregate="etcd\,servicecenter" \
- --set
sc.discovery.clusters="sc-1=http://c1-etcd-client:2379\,sc-2=http://dc2-service-center:30100"
\
- --set sc.registry.enabled=true \
- --set sc.registry.type="etcd" \
- --set sc.registry.name="sc-1" \
- service-center/
-helm install --name dc2 --namespace default \
- --set frontend.deployment=false \
- --set sc.image.repository="desktop-0028:5000/servicecomb/service-center" \
- --set sc.discovery.type="aggregate" \
- --set sc.discovery.aggregate="etcd\,servicecenter" \
- --set
sc.discovery.clusters="sc-2=http://c2-etcd-client:2379\,sc-1=http://dc1-service-center:30100"
\
+helm install --name servicecomb --namespace default \
+ --set sc.discovery.type="etcd" \
+ --set sc.discovery.clusters="http://coreos-etcd-client:2379" \
--set sc.registry.enabled=true \
--set sc.registry.type="etcd" \
- --set sc.registry.name="sc-2" \
service-center/
```
diff --git a/docs/multidcs.md b/docs/multidcs.md
index ecba0ea..3354ff6 100644
--- a/docs/multidcs.md
+++ b/docs/multidcs.md
@@ -14,6 +14,15 @@ isolated from each other. Another implementation of the
discovery plug-in, `Serv
access multiple SC instances and periodically pull up micro-service instance
information so that if some
micro-services can request aggregate, cross-DCs can be implemented using the
same API as SC cluster.
+If SC aggregate is not deployed globally, SC also supports another way to
implement multiple DCs discovery,
+as shown below.
+
+
+
+The difference between the two approaches is that global deployment aggregate
can divert service discovery traffic,
+the whole architecture is more like a read-write separation architecture, and
the SC of each DC manage microservice
+information independently, which reduces the complexity. So we recommend the
first architecture.
+
## Quick Start
Let's assume you want to install 2 clusters of Service-Center in different DCs
with following details.
diff --git a/docs/multidcs2.PNG b/docs/multidcs2.PNG
new file mode 100644
index 0000000..1190e4c
Binary files /dev/null and b/docs/multidcs2.PNG differ
diff --git
a/examples/infrastructures/k8s/service-center/templates/configmap.yaml
b/examples/infrastructures/k8s/service-center/templates/configmap.yaml
index 1a2c72d..56820b8 100644
--- a/examples/infrastructures/k8s/service-center/templates/configmap.yaml
+++ b/examples/infrastructures/k8s/service-center/templates/configmap.yaml
@@ -35,8 +35,7 @@ data:
ssl_mode = 0
enable_pprof = 1
{{- end }}
-
-{{- if .Values.frontend.deployment -}}
+{{- if .Values.frontend.deployment }}
---
apiVersion: v1
kind: ConfigMap
diff --git
a/examples/infrastructures/k8s/service-center/templates/deployment.yaml
b/examples/infrastructures/k8s/service-center/templates/deployment.yaml
index 8bf7418..ddaf8f5 100644
--- a/examples/infrastructures/k8s/service-center/templates/deployment.yaml
+++ b/examples/infrastructures/k8s/service-center/templates/deployment.yaml
@@ -53,8 +53,7 @@ spec:
{{ toYaml .Values.sc.nodeSelector | indent 8 }}
{{- end }}
{{- end }}
-
-{{- if .Values.frontend.deployment -}}
+{{- if .Values.frontend.deployment }}
---
apiVersion: extensions/v1beta1
kind: Deployment
diff --git a/examples/infrastructures/k8s/service-center/templates/service.yaml
b/examples/infrastructures/k8s/service-center/templates/service.yaml
index 078b6df..f835ef6 100644
--- a/examples/infrastructures/k8s/service-center/templates/service.yaml
+++ b/examples/infrastructures/k8s/service-center/templates/service.yaml
@@ -24,8 +24,7 @@ spec:
app: {{ template "service-center.name" . }}
release: {{ .Release.Name }}
{{- end }}
-
-{{- if .Values.frontend.deployment -}}
+{{- if .Values.frontend.deployment }}
---
apiVersion: v1
kind: Service
diff --git a/server/admin/service.go b/server/admin/service.go
index 2a86d80..6255f38 100644
--- a/server/admin/service.go
+++ b/server/admin/service.go
@@ -95,7 +95,12 @@ func (service *AdminService) dumpAll(ctx context.Context,
cache *model.Cache) {
}
func setValue(e discovery.Adaptor, setter model.Setter) {
+ exists := make(map[string]struct{})
e.Cache().ForEach(func(k string, kv *discovery.KeyValue) (next bool) {
+ if _, ok := exists[k]; ok {
+ return true
+ }
+ exists[k] = struct{}{}
setter.SetValue(&model.KV{Key: k, Rev: kv.ModRevision, Value:
kv.Value})
return true
})
diff --git a/server/plugin/pkg/discovery/aggregate/adaptor.go
b/server/plugin/pkg/discovery/aggregate/adaptor.go
index 7196d11..cb66980 100644
--- a/server/plugin/pkg/discovery/aggregate/adaptor.go
+++ b/server/plugin/pkg/discovery/aggregate/adaptor.go
@@ -16,6 +16,7 @@
package aggregate
import (
+ "github.com/apache/incubator-servicecomb-service-center/pkg/util"
mgr
"github.com/apache/incubator-servicecomb-service-center/server/plugin"
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/discovery"
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/registry"
@@ -26,13 +27,22 @@ import (
type Aggregator []discovery.Adaptor
func (as Aggregator) Search(ctx context.Context, opts
...registry.PluginOpOption) (*discovery.Response, error) {
- var response discovery.Response
+ var (
+ response discovery.Response
+ exists = make(map[string]struct{})
+ )
for _, a := range as {
resp, err := a.Search(ctx, opts...)
if err != nil {
continue
}
- response.Kvs = append(response.Kvs, resp.Kvs...)
+ for _, kv := range resp.Kvs {
+ key := util.BytesToStringWithNoCopy(kv.Key)
+ if _, ok := exists[key]; !ok {
+ exists[key] = struct{}{}
+ response.Kvs = append(response.Kvs, kv)
+ }
+ }
response.Count += resp.Count
}
return &response, nil
diff --git a/server/plugin/pkg/discovery/k8s/adaptor/cacher_index.go
b/server/plugin/pkg/discovery/k8s/adaptor/cacher_index.go
index 2f0651c..b03a637 100644
--- a/server/plugin/pkg/discovery/k8s/adaptor/cacher_index.go
+++ b/server/plugin/pkg/discovery/k8s/adaptor/cacher_index.go
@@ -20,7 +20,6 @@ import (
pb
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/discovery"
"k8s.io/api/core/v1"
- meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type ServiceIndexCacher struct {
@@ -30,14 +29,18 @@ type ServiceIndexCacher struct {
// onServiceEvent is the method to refresh service cache
func (c *ServiceIndexCacher) onServiceEvent(evt K8sEvent) {
svc := evt.Object.(*v1.Service)
- if svc.Namespace == meta.NamespaceSystem {
- return
- }
-
domainProject := Kubernetes().GetDomainProject()
serviceId := string(svc.UID)
indexKey :=
core.GenerateServiceIndexKey(generateServiceKey(domainProject, svc))
+ if !ShouldRegisterService(svc) {
+ kv := c.Cache().Get(indexKey)
+ if kv != nil {
+ c.Notify(pb.EVT_DELETE, indexKey, kv)
+ }
+ return
+ }
+
switch evt.EventType {
case pb.EVT_CREATE:
kv := AsKeyValue(indexKey, serviceId, svc.ResourceVersion)
diff --git a/server/plugin/pkg/discovery/k8s/adaptor/cacher_instance.go
b/server/plugin/pkg/discovery/k8s/adaptor/cacher_instance.go
index 852c69e..55c0f3e 100644
--- a/server/plugin/pkg/discovery/k8s/adaptor/cacher_instance.go
+++ b/server/plugin/pkg/discovery/k8s/adaptor/cacher_instance.go
@@ -21,7 +21,6 @@ import (
pb
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/discovery"
"k8s.io/api/core/v1"
- meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"reflect"
"strconv"
)
@@ -33,29 +32,25 @@ type InstanceCacher struct {
// onServiceEvent is the method to refresh service cache
func (c *InstanceCacher) onServiceEvent(evt K8sEvent) {
svc := evt.Object.(*v1.Service)
- if svc.Namespace == meta.NamespaceSystem {
- return
- }
-
domainProject := Kubernetes().GetDomainProject()
serviceId := string(svc.UID)
- instKey := core.GenerateInstanceKey(domainProject, serviceId, "")
switch evt.EventType {
case pb.EVT_DELETE:
- // instances
- var kvs []*discovery.KeyValue
- c.Cache().GetPrefix(instKey, &kvs)
- for _, kv := range kvs {
- key := util.BytesToStringWithNoCopy(kv.Key)
- c.Notify(pb.EVT_DELETE, key, kv)
+ c.deleteInstances(domainProject, serviceId)
+ case pb.EVT_UPDATE:
+ if !ShouldRegisterService(svc) {
+ c.deleteInstances(domainProject, serviceId)
+ return
}
+ ep := Kubernetes().GetEndpoints(svc.Namespace, svc.Name)
+ c.onEndpointsEvent(K8sEvent{pb.EVT_CREATE, ep, nil})
}
}
-func (c *InstanceCacher) getInstances(serviceId string) (m
map[string]*discovery.KeyValue) {
+func (c *InstanceCacher) getInstances(domainProject, serviceId string) (m
map[string]*discovery.KeyValue) {
var arr []*discovery.KeyValue
- key := core.GenerateInstanceKey(Kubernetes().GetDomainProject(),
serviceId, "")
+ key := core.GenerateInstanceKey(domainProject, serviceId, "")
if l := c.Cache().GetPrefix(key, &arr); l > 0 {
m = make(map[string]*discovery.KeyValue, l)
for _, kv := range arr {
@@ -65,20 +60,26 @@ func (c *InstanceCacher) getInstances(serviceId string) (m
map[string]*discovery
return
}
+func (c *InstanceCacher) deleteInstances(domainProject, serviceId string) {
+ var kvs []*discovery.KeyValue
+ c.Cache().GetPrefix(core.GenerateInstanceKey(domainProject, serviceId,
""), &kvs)
+ for _, kv := range kvs {
+ key := util.BytesToStringWithNoCopy(kv.Key)
+ c.Notify(pb.EVT_DELETE, key, kv)
+ }
+}
+
// onEndpointsEvent is the method to refresh instance cache
func (c *InstanceCacher) onEndpointsEvent(evt K8sEvent) {
ep := evt.Object.(*v1.Endpoints)
- if ep.Namespace == meta.NamespaceSystem {
- return
- }
-
svc := Kubernetes().GetService(ep.Namespace, ep.Name)
- if svc == nil {
+ if svc == nil || !ShouldRegisterService(svc) {
return
}
serviceId := string(svc.UID)
- oldKvs := c.getInstances(serviceId)
+ domainProject := Kubernetes().GetDomainProject()
+ oldKvs := c.getInstances(domainProject, serviceId)
newKvs := make(map[string]*discovery.KeyValue)
for _, ss := range ep.Subsets {
for _, ea := range ss.Addresses {
@@ -108,6 +109,9 @@ func (c *InstanceCacher) onEndpointsEvent(evt K8sEvent) {
DataCenterInfo: &pb.DataCenterInfo{},
Timestamp:
strconv.FormatInt(pod.CreationTimestamp.Unix(), 10),
Version: getLabel(svc.Labels,
LabelVersion, pb.VERSION),
+ Properties: map[string]string{
+ PropNodeIP: pod.Status.HostIP,
+ },
}
inst.DataCenterInfo.Region,
inst.DataCenterInfo.AvailableZone = getRegionAZ(node)
inst.ModTimestamp = inst.Timestamp
diff --git a/server/plugin/pkg/discovery/k8s/adaptor/cacher_service.go
b/server/plugin/pkg/discovery/k8s/adaptor/cacher_service.go
index c1455ba..cb5fcf0 100644
--- a/server/plugin/pkg/discovery/k8s/adaptor/cacher_service.go
+++ b/server/plugin/pkg/discovery/k8s/adaptor/cacher_service.go
@@ -20,7 +20,6 @@ import (
pb
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/discovery"
"k8s.io/api/core/v1"
- meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type ServiceCacher struct {
@@ -30,18 +29,25 @@ type ServiceCacher struct {
// onServiceEvent is the method to refresh service cache
func (c *ServiceCacher) onServiceEvent(evt K8sEvent) {
svc := evt.Object.(*v1.Service)
- if svc.Namespace == meta.NamespaceSystem {
- return
- }
-
domainProject := Kubernetes().GetDomainProject()
serviceId := string(svc.UID)
key := core.GenerateServiceKey(domainProject, serviceId)
+ if !ShouldRegisterService(svc) {
+ kv := c.Cache().Get(key)
+ if kv != nil {
+ c.Notify(pb.EVT_DELETE, key, kv)
+ }
+ return
+ }
+
switch evt.EventType {
case pb.EVT_CREATE, pb.EVT_UPDATE:
ms := FromK8sService(svc)
kv := AsKeyValue(key, ms, svc.ResourceVersion)
+ if c.Cache().Get(key) == nil {
+ evt.EventType = pb.EVT_CREATE
+ }
c.Notify(evt.EventType, key, kv)
case pb.EVT_DELETE:
// service
diff --git a/server/plugin/pkg/discovery/k8s/adaptor/common.go
b/server/plugin/pkg/discovery/k8s/adaptor/common.go
index ed9a88f..cf67a62 100644
--- a/server/plugin/pkg/discovery/k8s/adaptor/common.go
+++ b/server/plugin/pkg/discovery/k8s/adaptor/common.go
@@ -18,6 +18,8 @@ package adaptor
import (
"github.com/apache/incubator-servicecomb-service-center/pkg/queue"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
+ "k8s.io/api/core/v1"
+ meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"time"
@@ -44,10 +46,14 @@ const (
LabelNodeRegion = "failure-domain.beta.kubernetes.io/region"
LabelNodeAZ = "failure-domain.beta.kubernetes.io/zone"
+ // register annotations
+ AnnotationRegister = "service-center.servicecomb.io/register"
+
// properties
PropNamespace = "namespace"
PropExternalName = "externalName"
PropServiceType = "type"
+ PropNodeIP = "nodeIP"
minWaitInterval = 5 * time.Second
defaultResyncInterval = 60 * time.Second
@@ -79,3 +85,13 @@ func Queue(t K8sType) *queue.TaskQueue {
})
return q.(*queue.TaskQueue)
}
+
+func ShouldRegisterService(service *v1.Service) bool {
+ if service.Namespace == meta.NamespaceSystem {
+ return false
+ }
+ if register, ok := service.ObjectMeta.Annotations[AnnotationRegister];
ok && register == "false" {
+ return false
+ }
+ return true
+}
diff --git a/server/plugin/pkg/discovery/k8s/adaptor/kube_client.go
b/server/plugin/pkg/discovery/k8s/adaptor/kube_client.go
index 422d9a9..b1422d0 100644
--- a/server/plugin/pkg/discovery/k8s/adaptor/kube_client.go
+++ b/server/plugin/pkg/discovery/k8s/adaptor/kube_client.go
@@ -178,6 +178,19 @@ func (c *K8sClient) GetService(namespace, name string)
(svc *v1.Service) {
return
}
+func (c *K8sClient) GetEndpoints(namespace, name string) (ep *v1.Endpoints) {
+ obj, ok, err :=
c.Endpoints().GetStore().GetByKey(getFullName(namespace, name))
+ if err != nil {
+ log.Errorf(err, "get k8s endpoints[%s/%s] failed", namespace,
name)
+ return
+ }
+ if !ok {
+ return
+ }
+ ep = obj.(*v1.Endpoints)
+ return
+}
+
func (c *K8sClient) GetPodByIP(ip string) (pod *v1.Pod) {
itf, ok := c.ipIndex.Get(ip)
if !ok {
diff --git a/server/plugin/pkg/discovery/servicecenter/aggregate.go
b/server/plugin/pkg/discovery/servicecenter/aggregate.go
index fd14ad6..fb3e818 100644
--- a/server/plugin/pkg/discovery/servicecenter/aggregate.go
+++ b/server/plugin/pkg/discovery/servicecenter/aggregate.go
@@ -16,16 +16,30 @@
package servicecenter
import (
+ "crypto/tls"
"github.com/apache/incubator-servicecomb-service-center/pkg/client/sc"
"github.com/apache/incubator-servicecomb-service-center/pkg/log"
"github.com/apache/incubator-servicecomb-service-center/server/admin/model"
pb
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
scerr
"github.com/apache/incubator-servicecomb-service-center/server/error"
+ mgr
"github.com/apache/incubator-servicecomb-service-center/server/plugin"
"github.com/apache/incubator-servicecomb-service-center/server/plugin/pkg/registry"
+ "strings"
)
type SCClientAggregate []*sc.SCClient
+var clientTLS *tls.Config
+
+func getClientTLS() (*tls.Config, error) {
+ if clientTLS != nil {
+ return clientTLS, nil
+ }
+ var err error
+ clientTLS, err = mgr.Plugins().TLS().ClientConfig()
+ return clientTLS, err
+}
+
func (c *SCClientAggregate) GetScCache() (*model.Cache, error) {
var caches model.Cache
for _, client := range *c {
@@ -90,6 +104,14 @@ func NewSCClientAggregate() *SCClientAggregate {
continue
}
client.Timeout = registry.Configuration().RequestTimeOut
+ if strings.Index(addr[0], "https") >= 0 {
+ client.TLS, err = getClientTLS()
+ if err != nil {
+ log.Errorf(err, "get service center[%s][%s] tls
config failed", name, addr)
+ continue
+ }
+ }
+
*c = append(*c, client)
log.Infof("new service center[%s][%s] client", name, addr)
}
diff --git a/server/plugin/pkg/discovery/servicecenter/cluster.go
b/server/plugin/pkg/discovery/servicecenter/cluster.go
index 5c0bdd0..e6f4be9 100644
--- a/server/plugin/pkg/discovery/servicecenter/cluster.go
+++ b/server/plugin/pkg/discovery/servicecenter/cluster.go
@@ -93,7 +93,6 @@ func (c *ServiceCenterCluster) Sync(ctx context.Context)
error {
}
// microservice
- // TODO remove duplicate SERVICECENTER
serviceCacher, ok := c.cachers[backend.SERVICE]
if ok {
c.check(serviceCacher, &cache.Microservices)
@@ -132,7 +131,12 @@ func (c *ServiceCenterCluster) Sync(ctx context.Context)
error {
}
func (c *ServiceCenterCluster) check(local *ServiceCenterCacher, remote
model.Getter) {
+ exists := make(map[string]struct{})
remote.ForEach(func(_ int, v *model.KV) bool {
+ if _, ok := exists[v.Key]; ok {
+ return true
+ }
+ exists[v.Key] = struct{}{}
kv := local.Cache().Get(v.Key)
newKv := &discovery.KeyValue{
Key: util.StringToBytesWithNoCopy(v.Key),
diff --git a/server/plugin/pkg/registry/config.go
b/server/plugin/pkg/registry/config.go
index fbfa9f3..b75cfa4 100644
--- a/server/plugin/pkg/registry/config.go
+++ b/server/plugin/pkg/registry/config.go
@@ -43,23 +43,39 @@ type Config struct {
func (c *Config) InitClusters() {
c.Clusters = make(Clusters)
- kvs := strings.Split(c.ClusterAddresses, ",")
- for _, cluster := range kvs {
- // sc-0=http(s)://host1:port1|http(s)://host2:port2
- names := strings.Split(cluster, "=")
- if len(names) != 2 {
- continue
+ //
sc-0=http(s)://host1:port1,http(s)://host2:port2,sc-1=http(s)://host3:port3
+ kvs := strings.Split(c.ClusterAddresses, "=")
+ if l := len(kvs); l >= 2 {
+ var (
+ names []string
+ addrs [][]string
+ )
+ for i := 0; i < l; i++ {
+ ss := strings.Split(kvs[i], ",")
+ sl := len(ss)
+ if i != l-1 {
+ names = append(names, ss[sl-1])
+ }
+ if i != 0 {
+ if sl > 1 && i != l-1 {
+ addrs = append(addrs, ss[:sl-1])
+ } else {
+ addrs = append(addrs, ss)
+ }
+ }
+ }
+ for i, name := range names {
+ c.Clusters[name] = addrs[i]
}
- c.Clusters[names[0]] = strings.Split(names[1], "|")
}
if len(c.Clusters) == 0 {
c.Clusters[c.ClusterName] = []string{c.ClusterAddresses}
}
}
-// ClusterAddress return the address of current SC's registry backend
-func (c *Config) ClusterAddress() string {
- return c.Clusters[c.ClusterName][0]
+// RegistryAddresses return the address of current SC's registry backend
+func (c *Config) RegistryAddresses() []string {
+ return c.Clusters[c.ClusterName]
}
func Configuration() *Config {
diff --git a/server/plugin/pkg/registry/etcd/etcd.go
b/server/plugin/pkg/registry/etcd/etcd.go
index c940828..0dd4f83 100644
--- a/server/plugin/pkg/registry/etcd/etcd.go
+++ b/server/plugin/pkg/registry/etcd/etcd.go
@@ -751,7 +751,7 @@ func (c *EtcdClient) ReOpen() error {
func (c *EtcdClient) parseEndpoints() {
// use the default cluster endpoints
- addrs := strings.Split(registry.Configuration().ClusterAddress(), ",")
+ addrs := registry.Configuration().RegistryAddresses()
endpoints := make([]string, 0, len(addrs))
for _, addr := range addrs {
diff --git a/server/plugin/pkg/registry/etcd/etcd_test.go
b/server/plugin/pkg/registry/etcd/etcd_test.go
index 5fc3fa6..f3856b4 100644
--- a/server/plugin/pkg/registry/etcd/etcd_test.go
+++ b/server/plugin/pkg/registry/etcd/etcd_test.go
@@ -29,6 +29,7 @@ import (
"net/http"
"os"
"strconv"
+ "strings"
"sync"
"testing"
"time"
@@ -42,6 +43,49 @@ var (
endpoint = registry.Configuration().ClusterAddresses
)
+func TestInitCluster(t *testing.T) {
+ registry.Configuration().ClusterAddresses = "127.0.0.1:2379"
+ registry.Configuration().InitClusters()
+ if strings.Join(registry.Configuration().RegistryAddresses(), ",") !=
"127.0.0.1:2379" {
+ t.Fatalf("TestInitCluster failed, %v",
registry.Configuration().RegistryAddresses())
+ }
+ registry.Configuration().ClusterAddresses =
"127.0.0.1:2379,127.0.0.2:2379"
+ registry.Configuration().InitClusters()
+ if strings.Join(registry.Configuration().RegistryAddresses(), ",") !=
"127.0.0.1:2379,127.0.0.2:2379" {
+ t.Fatalf("TestInitCluster failed, %v",
registry.Configuration().RegistryAddresses())
+ }
+ registry.Configuration().ClusterName = "sc-0"
+ registry.Configuration().ClusterAddresses =
"sc-0=127.0.0.1:2379,127.0.0.2:2379"
+ registry.Configuration().InitClusters()
+ if strings.Join(registry.Configuration().RegistryAddresses(), ",") !=
"127.0.0.1:2379,127.0.0.2:2379" {
+ t.Fatalf("TestInitCluster failed, %v",
registry.Configuration().RegistryAddresses())
+ }
+ if strings.Join(registry.Configuration().Clusters["sc-0"], ",") !=
"127.0.0.1:2379,127.0.0.2:2379" {
+ t.Fatalf("TestInitCluster failed, %v",
registry.Configuration().RegistryAddresses())
+ }
+ registry.Configuration().ClusterName = "sc-0"
+ registry.Configuration().ClusterAddresses =
"sc-1=127.0.0.1:2379,127.0.0.2:2379,sc-2=127.0.0.3:2379"
+ registry.Configuration().InitClusters()
+ if strings.Join(registry.Configuration().RegistryAddresses(), ",") !=
"" {
+ t.Fatalf("TestInitCluster failed, %v",
registry.Configuration().RegistryAddresses())
+ }
+ if strings.Join(registry.Configuration().Clusters["sc-1"], ",") !=
"127.0.0.1:2379,127.0.0.2:2379" {
+ t.Fatalf("TestInitCluster failed, %v",
registry.Configuration().RegistryAddresses())
+ }
+ if strings.Join(registry.Configuration().Clusters["sc-2"], ",") !=
"127.0.0.3:2379" {
+ t.Fatalf("TestInitCluster failed, %v",
registry.Configuration().RegistryAddresses())
+ }
+ registry.Configuration().ClusterName = "sc-0"
+ registry.Configuration().ClusterAddresses =
"sc-0=127.0.0.1:2379,sc-1=127.0.0.3:2379,127.0.0.4:2379"
+ registry.Configuration().InitClusters()
+ if strings.Join(registry.Configuration().RegistryAddresses(), ",") !=
"127.0.0.1:2379" {
+ t.Fatalf("TestInitCluster failed, %v",
registry.Configuration().RegistryAddresses())
+ }
+ if strings.Join(registry.Configuration().Clusters["sc-1"], ",") !=
"127.0.0.3:2379,127.0.0.4:2379" {
+ t.Fatalf("TestInitCluster failed, %v",
registry.Configuration().RegistryAddresses())
+ }
+}
+
func TestEtcdClient(t *testing.T) {
etcd := &EtcdClient{
Endpoints: []string{endpoint},