This is an automated email from the ASF dual-hosted git repository. zenlin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
commit 3c2ca3392058360796f88004f531dd7eaaa02a4c Author: chinx <[email protected]> AuthorDate: Thu Jun 20 18:11:55 2019 +0800 Correct nodeName to clusterName as appropriate --- syncer/etcd/storage.go | 20 ++--- syncer/pkg/mock/mocksotrage/storage.go | 16 ++-- syncer/proto/event.pb.go | 119 ----------------------------- syncer/proto/event.proto | 15 ---- syncer/proto/syncer.pb.go | 84 ++++++++++++-------- syncer/proto/syncer.proto | 2 +- syncer/serf/config.go | 4 +- syncer/server/handler.go | 35 +++------ syncer/servicecenter/servicecenter.go | 14 ++-- syncer/servicecenter/servicecenter_test.go | 14 ++-- 10 files changed, 100 insertions(+), 223 deletions(-) diff --git a/syncer/etcd/storage.go b/syncer/etcd/storage.go index 5d149f2..b1e4f7f 100644 --- a/syncer/etcd/storage.go +++ b/syncer/etcd/storage.go @@ -87,18 +87,18 @@ next: continue next } } - key := mappingsKey + "/" + entry.NodeName + "/" + entry.OrgInstanceID + key := mappingsKey + "/" + entry.ClusterName + "/" + entry.OrgInstanceID if _, err := s.client.Delete(context.Background(), key); err != nil { - log.Errorf(err, "Delete instance nodeName=%s instanceID=%s failed", entry.NodeName, entry.OrgInstanceID) + log.Errorf(err, "Delete instance clusterName=%s instanceID=%s failed", entry.ClusterName, entry.OrgInstanceID) } } } -// UpdateMapByNode update map to storage by nodeName of other node -func (s *storage) UpdateMapByNode(nodeName string, mapping pb.SyncMapping) { +// UpdateMapByNode update map to storage by clusterName of other node +func (s *storage) UpdateMapByNode(clusterName string, mapping pb.SyncMapping) { newMaps := make(pb.SyncMapping, 0, len(mapping)) for _, val := range mapping { - key := mappingsKey + "/" + nodeName + "/" + val.OrgInstanceID + key := mappingsKey + "/" + clusterName + "/" + val.OrgInstanceID data, err := proto.Marshal(val) if err != nil { log.Errorf(err, "Proto marshal failed: %s", err) @@ -110,13 +110,13 @@ func (s *storage) UpdateMapByNode(nodeName string, mapping pb.SyncMapping) { } newMaps = append(newMaps, val) } - s.cleanExpired(s.GetMapByNode(nodeName), newMaps) + s.cleanExpired(s.GetMapByNode(clusterName), newMaps) } -// GetMapByNode get map by nodeName of other node -func (s *storage) GetMapByNode(nodeName string) (mapping pb.SyncMapping) { +// GetMapByNode get map by clusterName of other node +func (s *storage) GetMapByNode(clusterName string) (mapping pb.SyncMapping) { maps := make(pb.SyncMapping, 0, 10) - s.getPrefixKey(mappingsKey+"/"+nodeName, func(key, val []byte) (next bool) { + s.getPrefixKey(mappingsKey+"/"+clusterName, func(key, val []byte) (next bool) { next = true item := &pb.MappingEntry{} if err := proto.Unmarshal(val, item); err != nil { @@ -135,7 +135,7 @@ func (s *storage) UpdateMaps(maps pb.SyncMapping) { srcMaps := s.GetMaps() mappings := make(pb.SyncMapping, 0, len(maps)) for _, val := range maps { - key := mappingsKey + "/" + val.NodeName + "/" + val.OrgInstanceID + key := mappingsKey + "/" + val.ClusterName + "/" + val.OrgInstanceID data, err := proto.Marshal(val) if err != nil { log.Errorf(err, "Proto marshal failed: %s", err) diff --git a/syncer/pkg/mock/mocksotrage/storage.go b/syncer/pkg/mock/mocksotrage/storage.go index e72366c..36a1685 100644 --- a/syncer/pkg/mock/mocksotrage/storage.go +++ b/syncer/pkg/mock/mocksotrage/storage.go @@ -57,17 +57,17 @@ func (s *Storage) GetData() (data *pb.SyncData) { return } -// UpdateMapByNode update map to storage by nodeName of other node -func (s *Storage) UpdateMapByNode(nodeName string, mapping pb.SyncMapping) { +// UpdateMapByNode update map to storage by clusterName of other node +func (s *Storage) UpdateMapByNode(clusterName string, mapping pb.SyncMapping) { s.lock.Lock() - s.maps[nodeName] = mapping + s.maps[clusterName] = mapping s.lock.Unlock() } -// GetMapByNode get map by nodeName of other node -func (s *Storage) GetMapByNode(nodeName string) (mapping pb.SyncMapping) { +// GetMapByNode get map by clusterName of other node +func (s *Storage) GetMapByNode(clusterName string) (mapping pb.SyncMapping) { s.lock.RLock() - data, ok := s.maps[nodeName] + data, ok := s.maps[clusterName] if !ok { data = defaultMapping } @@ -79,12 +79,12 @@ func (s *Storage) UpdateMaps(maps pb.SyncMapping) { s.lock.Lock() mappings := make(map[string]pb.SyncMapping) for _, item := range maps { - mapping, ok := mappings[item.NodeName] + mapping, ok := mappings[item.ClusterName] if !ok { mapping = make(pb.SyncMapping, 0, 10) } mapping = append(mapping, item) - mappings[item.NodeName] = mapping + mappings[item.ClusterName] = mapping } s.maps = mappings s.lock.Unlock() diff --git a/syncer/proto/event.pb.go b/syncer/proto/event.pb.go deleted file mode 100644 index 8b44a52..0000000 --- a/syncer/proto/event.pb.go +++ /dev/null @@ -1,119 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: event.proto - -/* -Package proto is a generated protocol buffer package. - -It is generated from these files: - event.proto - syncer.proto - -It has these top-level messages: - Member - Discover - PullRequest - SyncService - SyncData - MapEntry -*/ -package proto - -import proto1 "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto1.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto1.ProtoPackageIsVersion2 // please upgrade the proto package - -type Member struct { - NodeName string `protobuf:"bytes,1,opt,name=nodeName" json:"nodeName,omitempty"` - RPCPort int32 `protobuf:"varint,2,opt,name=RPCPort" json:"RPCPort,omitempty"` - Time string `protobuf:"bytes,3,opt,name=time" json:"time,omitempty"` -} - -func (m *Member) Reset() { *m = Member{} } -func (m *Member) String() string { return proto1.CompactTextString(m) } -func (*Member) ProtoMessage() {} -func (*Member) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } - -func (m *Member) GetNodeName() string { - if m != nil { - return m.NodeName - } - return "" -} - -func (m *Member) GetRPCPort() int32 { - if m != nil { - return m.RPCPort - } - return 0 -} - -func (m *Member) GetTime() string { - if m != nil { - return m.Time - } - return "" -} - -type Discover struct { - ServiceName string `protobuf:"bytes,1,opt,name=serviceName" json:"serviceName,omitempty"` - Options string `protobuf:"bytes,2,opt,name=options" json:"options,omitempty"` - Time string `protobuf:"bytes,3,opt,name=time" json:"time,omitempty"` -} - -func (m *Discover) Reset() { *m = Discover{} } -func (m *Discover) String() string { return proto1.CompactTextString(m) } -func (*Discover) ProtoMessage() {} -func (*Discover) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } - -func (m *Discover) GetServiceName() string { - if m != nil { - return m.ServiceName - } - return "" -} - -func (m *Discover) GetOptions() string { - if m != nil { - return m.Options - } - return "" -} - -func (m *Discover) GetTime() string { - if m != nil { - return m.Time - } - return "" -} - -func init() { - proto1.RegisterType((*Member)(nil), "proto.Member") - proto1.RegisterType((*Discover)(nil), "proto.Discover") -} - -func init() { proto1.RegisterFile("event.proto", fileDescriptor0) } - -var fileDescriptor0 = []byte{ - // 154 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0x2d, 0x4b, 0xcd, - 0x2b, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0x41, 0x5c, 0x6c, 0xbe, - 0xa9, 0xb9, 0x49, 0xa9, 0x45, 0x42, 0x52, 0x5c, 0x1c, 0x79, 0xf9, 0x29, 0xa9, 0x7e, 0x89, 0xb9, - 0xa9, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x70, 0xbe, 0x90, 0x04, 0x17, 0x7b, 0x50, 0x80, - 0x73, 0x40, 0x7e, 0x51, 0x89, 0x04, 0x93, 0x02, 0xa3, 0x06, 0x6b, 0x10, 0x8c, 0x2b, 0x24, 0xc4, - 0xc5, 0x52, 0x92, 0x99, 0x9b, 0x2a, 0xc1, 0x0c, 0xd6, 0x01, 0x66, 0x2b, 0x45, 0x71, 0x71, 0xb8, - 0x64, 0x16, 0x27, 0xe7, 0x97, 0xa5, 0x16, 0x09, 0x29, 0x70, 0x71, 0x17, 0xa7, 0x16, 0x95, 0x65, - 0x26, 0x23, 0x1b, 0x8c, 0x2c, 0x04, 0x32, 0x3b, 0xbf, 0xa0, 0x24, 0x33, 0x3f, 0xaf, 0x18, 0x6c, - 0x36, 0x67, 0x10, 0x8c, 0x8b, 0xcd, 0xec, 0x24, 0x36, 0xb0, 0xb3, 0x8d, 0x01, 0x01, 0x00, 0x00, - 0xff, 0xff, 0x00, 0xd3, 0xab, 0xa6, 0xcc, 0x00, 0x00, 0x00, -} diff --git a/syncer/proto/event.proto b/syncer/proto/event.proto deleted file mode 100644 index 65aed26..0000000 --- a/syncer/proto/event.proto +++ /dev/null @@ -1,15 +0,0 @@ -syntax = "proto3"; - -package proto; - -message Member { - string nodeName = 1; - int32 RPCPort = 2; - string time = 3; -} - -message Discover { - string serviceName = 1; - string options = 2; - string time = 3; -} \ No newline at end of file diff --git a/syncer/proto/syncer.pb.go b/syncer/proto/syncer.pb.go index 58fe17a..7d502a2 100644 --- a/syncer/proto/syncer.pb.go +++ b/syncer/proto/syncer.pb.go @@ -1,6 +1,20 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // source: syncer.proto +/* +Package proto is a generated protocol buffer package. + +import "../../server/core/proto/services.proto"; + +It is generated from these files: + syncer.proto + +It has these top-level messages: + PullRequest + SyncService + SyncData + MappingEntry +*/ package proto import proto1 "github.com/golang/protobuf/proto" @@ -18,6 +32,12 @@ var _ = proto1.Marshal var _ = fmt.Errorf var _ = math.Inf +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto1.ProtoPackageIsVersion2 // please upgrade the proto package + type PullRequest struct { ServiceName string `protobuf:"bytes,1,opt,name=serviceName" json:"serviceName,omitempty"` Options string `protobuf:"bytes,2,opt,name=options" json:"options,omitempty"` @@ -27,7 +47,7 @@ type PullRequest struct { func (m *PullRequest) Reset() { *m = PullRequest{} } func (m *PullRequest) String() string { return proto1.CompactTextString(m) } func (*PullRequest) ProtoMessage() {} -func (*PullRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{0} } +func (*PullRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } func (m *PullRequest) GetServiceName() string { if m != nil { @@ -59,7 +79,7 @@ type SyncService struct { func (m *SyncService) Reset() { *m = SyncService{} } func (m *SyncService) String() string { return proto1.CompactTextString(m) } func (*SyncService) ProtoMessage() {} -func (*SyncService) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{1} } +func (*SyncService) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } func (m *SyncService) GetDomainProject() string { if m != nil { @@ -89,7 +109,7 @@ type SyncData struct { func (m *SyncData) Reset() { *m = SyncData{} } func (m *SyncData) String() string { return proto1.CompactTextString(m) } func (*SyncData) ProtoMessage() {} -func (*SyncData) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{2} } +func (*SyncData) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } func (m *SyncData) GetServices() []*SyncService { if m != nil { @@ -99,7 +119,7 @@ func (m *SyncData) GetServices() []*SyncService { } type MappingEntry struct { - NodeName string `protobuf:"bytes,1,opt,name=nodeName" json:"nodeName,omitempty"` + ClusterName string `protobuf:"bytes,1,opt,name=clusterName" json:"clusterName,omitempty"` DomainProject string `protobuf:"bytes,2,opt,name=domainProject" json:"domainProject,omitempty"` OrgServiceID string `protobuf:"bytes,3,opt,name=orgServiceID" json:"orgServiceID,omitempty"` OrgInstanceID string `protobuf:"bytes,4,opt,name=orgInstanceID" json:"orgInstanceID,omitempty"` @@ -110,11 +130,11 @@ type MappingEntry struct { func (m *MappingEntry) Reset() { *m = MappingEntry{} } func (m *MappingEntry) String() string { return proto1.CompactTextString(m) } func (*MappingEntry) ProtoMessage() {} -func (*MappingEntry) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{3} } +func (*MappingEntry) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } -func (m *MappingEntry) GetNodeName() string { +func (m *MappingEntry) GetClusterName() string { if m != nil { - return m.NodeName + return m.ClusterName } return "" } @@ -233,29 +253,29 @@ var _Sync_serviceDesc = grpc.ServiceDesc{ Metadata: "syncer.proto", } -func init() { proto1.RegisterFile("syncer.proto", fileDescriptor1) } - -var fileDescriptor1 = []byte{ - // 335 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x92, 0xbd, 0x6e, 0xc2, 0x30, - 0x14, 0x85, 0x1b, 0x08, 0x7f, 0x37, 0xb4, 0x95, 0x6e, 0x17, 0x8b, 0x2e, 0x28, 0xea, 0xc0, 0x02, - 0x03, 0x55, 0x87, 0x76, 0xa6, 0x03, 0x03, 0x15, 0x0a, 0x73, 0x87, 0xd4, 0x58, 0xc8, 0x15, 0xd8, - 0xa9, 0xed, 0x54, 0xe2, 0x61, 0xfa, 0x76, 0x7d, 0x90, 0xca, 0x8e, 0x03, 0x8e, 0xca, 0x94, 0xf8, - 0xe4, 0xe4, 0xbb, 0xf7, 0x1c, 0x19, 0x86, 0xfa, 0x28, 0x28, 0x53, 0xb3, 0x42, 0x49, 0x23, 0xb1, - 0xe3, 0x1e, 0xa3, 0x1b, 0xcd, 0xd4, 0x37, 0xa7, 0x4c, 0x57, 0x72, 0xfa, 0x0e, 0xc9, 0xba, 0xdc, - 0xef, 0x33, 0xf6, 0x55, 0x32, 0x6d, 0x70, 0x0c, 0x89, 0x37, 0xbc, 0xe5, 0x07, 0x46, 0xa2, 0x71, - 0x34, 0x19, 0x64, 0xa1, 0x84, 0x04, 0x7a, 0xb2, 0x30, 0x5c, 0x0a, 0x4d, 0x5a, 0xee, 0x6b, 0x7d, - 0x44, 0x84, 0xd8, 0xf0, 0x03, 0x23, 0x6d, 0x27, 0xbb, 0xf7, 0xf4, 0x27, 0x82, 0x64, 0x73, 0x14, - 0x74, 0x53, 0x11, 0xf0, 0x01, 0xae, 0xb7, 0xf2, 0x90, 0x73, 0xb1, 0x56, 0xf2, 0x93, 0x51, 0xe3, - 0x27, 0x34, 0x45, 0x9c, 0x42, 0xcf, 0x8f, 0x74, 0x33, 0x92, 0xf9, 0x5d, 0xb5, 0xed, 0x6c, 0xc5, - 0xa9, 0x92, 0x9e, 0x95, 0xd5, 0x1e, 0x7c, 0x86, 0x01, 0x17, 0xda, 0xe4, 0x82, 0x32, 0x4d, 0xda, - 0xe3, 0xf6, 0x24, 0x99, 0xdf, 0x5f, 0xf8, 0x61, 0xe9, 0x3d, 0xd9, 0xd9, 0x9d, 0xbe, 0x40, 0xdf, - 0xae, 0xb7, 0xc8, 0x4d, 0x8e, 0x33, 0xe8, 0xd7, 0xe5, 0x90, 0xc8, 0x51, 0xd0, 0x53, 0x82, 0x04, - 0xd9, 0xc9, 0x93, 0xfe, 0x46, 0x30, 0x5c, 0xe5, 0x45, 0xc1, 0xc5, 0xee, 0x55, 0x18, 0x75, 0xc4, - 0x11, 0xf4, 0x85, 0xdc, 0x86, 0xcd, 0x9d, 0xce, 0xff, 0x83, 0xb7, 0x2e, 0x05, 0x4f, 0x61, 0x28, - 0xd5, 0xae, 0xde, 0x77, 0xe1, 0xab, 0x6c, 0x68, 0x96, 0x24, 0xd5, 0xae, 0x0e, 0xb3, 0x5c, 0x90, - 0xb8, 0x22, 0x35, 0x44, 0x4b, 0xa2, 0xa5, 0x3a, 0x93, 0x3a, 0x15, 0x29, 0xd4, 0x2c, 0x89, 0x96, - 0x2a, 0x20, 0x75, 0x2b, 0x52, 0x43, 0x9c, 0x3f, 0x41, 0x6c, 0xf3, 0xe3, 0x14, 0x62, 0x7b, 0x53, - 0xb0, 0x2e, 0x25, 0xb8, 0x36, 0xa3, 0xdb, 0xa0, 0x28, 0xdb, 0x65, 0x7a, 0xf5, 0xd1, 0x75, 0xca, - 0xe3, 0x5f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xf7, 0xdc, 0x31, 0x80, 0x86, 0x02, 0x00, 0x00, +func init() { proto1.RegisterFile("syncer.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 336 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x92, 0xbf, 0x4e, 0xc3, 0x30, + 0x10, 0x87, 0x49, 0x9b, 0xfe, 0xbb, 0x14, 0x90, 0x8e, 0xc5, 0x2a, 0x4b, 0x15, 0x31, 0x74, 0x69, + 0x87, 0x22, 0x06, 0x98, 0xcb, 0xd0, 0xa1, 0xa8, 0x4a, 0x67, 0x86, 0x60, 0xac, 0xca, 0xa8, 0xb5, + 0x83, 0xed, 0x20, 0xe5, 0x61, 0x78, 0x41, 0x9e, 0x02, 0xd9, 0x71, 0xa8, 0x23, 0x3a, 0x25, 0xf9, + 0x72, 0xfe, 0xee, 0xee, 0x27, 0xc3, 0x58, 0x57, 0x82, 0x32, 0xb5, 0x28, 0x94, 0x34, 0x12, 0x7b, + 0xee, 0x31, 0xb9, 0xd2, 0x4c, 0x7d, 0x71, 0xca, 0x74, 0x8d, 0xd3, 0x57, 0x48, 0xb6, 0xe5, 0xe1, + 0x90, 0xb1, 0xcf, 0x92, 0x69, 0x83, 0x53, 0x48, 0x7c, 0xc1, 0x4b, 0x7e, 0x64, 0x24, 0x9a, 0x46, + 0xb3, 0x51, 0x16, 0x22, 0x24, 0x30, 0x90, 0x85, 0xe1, 0x52, 0x68, 0xd2, 0x71, 0x7f, 0x9b, 0x4f, + 0x44, 0x88, 0x0d, 0x3f, 0x32, 0xd2, 0x75, 0xd8, 0xbd, 0xa7, 0xdf, 0x11, 0x24, 0xbb, 0x4a, 0xd0, + 0x5d, 0x6d, 0xc0, 0x3b, 0xb8, 0x7c, 0x97, 0xc7, 0x9c, 0x8b, 0xad, 0x92, 0x1f, 0x8c, 0x1a, 0xdf, + 0xa1, 0x0d, 0x71, 0x0e, 0x03, 0xdf, 0xd2, 0xf5, 0x48, 0x96, 0x37, 0xf5, 0xb4, 0x8b, 0x0d, 0xa7, + 0x4a, 0x7a, 0x57, 0xd6, 0xd4, 0xe0, 0x23, 0x8c, 0xb8, 0xd0, 0x26, 0x17, 0x94, 0x69, 0xd2, 0x9d, + 0x76, 0x67, 0xc9, 0xf2, 0xf6, 0xcc, 0x81, 0xb5, 0xaf, 0xc9, 0x4e, 0xd5, 0xe9, 0x13, 0x0c, 0xed, + 0x78, 0xab, 0xdc, 0xe4, 0xb8, 0x80, 0x61, 0x13, 0x0e, 0x89, 0x9c, 0x05, 0xbd, 0x25, 0xd8, 0x20, + 0xfb, 0xab, 0x49, 0x7f, 0x22, 0x18, 0x6f, 0xf2, 0xa2, 0xe0, 0x62, 0xff, 0x2c, 0x8c, 0xaa, 0x6c, + 0x78, 0xf4, 0x50, 0x6a, 0xc3, 0x54, 0x18, 0x5e, 0x80, 0xfe, 0xaf, 0xdf, 0x39, 0xb7, 0x7e, 0x0a, + 0x63, 0xa9, 0xf6, 0xcd, 0xd4, 0x2b, 0x1f, 0x68, 0x8b, 0x59, 0x93, 0x54, 0xfb, 0x66, 0xa5, 0xf5, + 0x8a, 0xc4, 0xb5, 0xa9, 0x05, 0xad, 0x89, 0x96, 0xea, 0x64, 0xea, 0xd5, 0xa6, 0x90, 0x59, 0x13, + 0x2d, 0x55, 0x60, 0xea, 0xd7, 0xa6, 0x16, 0x5c, 0x3e, 0x40, 0x6c, 0x53, 0xc0, 0x39, 0xc4, 0xf6, + 0xbe, 0x60, 0x13, 0x4d, 0x70, 0x79, 0x26, 0xd7, 0x41, 0x5c, 0x36, 0xd1, 0xf4, 0xe2, 0xad, 0xef, + 0xc8, 0xfd, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xfa, 0x2b, 0x2b, 0x6b, 0x8c, 0x02, 0x00, 0x00, } diff --git a/syncer/proto/syncer.proto b/syncer/proto/syncer.proto index 18d5be3..103e82c 100644 --- a/syncer/proto/syncer.proto +++ b/syncer/proto/syncer.proto @@ -24,7 +24,7 @@ message SyncData { } message MappingEntry { - string nodeName = 1; + string clusterName = 1; string domainProject = 2; string orgServiceID = 3; string orgInstanceID = 4; diff --git a/syncer/serf/config.go b/syncer/serf/config.go index 0ca8c70..8a25908 100644 --- a/syncer/serf/config.go +++ b/syncer/serf/config.go @@ -36,6 +36,7 @@ const ( groupExpect = 3 tagKeyClusterName = "syncer-cluster-name" TagKeyClusterPort = "syncer-cluster-port" + TagKeyRPCPort = "syncer-rpc-port" ) // DefaultConfig default config @@ -95,9 +96,10 @@ func (c *Config) convertToSerf() (*serf.Config, error) { serfConf.MemberlistConfig.BindAddr = bindIP serfConf.MemberlistConfig.BindPort = bindPort serfConf.NodeName = c.NodeName + serfConf.Tags = map[string]string{TagKeyRPCPort: strconv.Itoa(c.RPCPort)} if c.ClusterName != "" { - serfConf.Tags = map[string]string{tagKeyClusterName: c.ClusterName} + serfConf.Tags[tagKeyClusterName] = c.ClusterName serfConf.Tags[TagKeyClusterPort] = strconv.Itoa(c.ClusterPort) } diff --git a/syncer/server/handler.go b/syncer/server/handler.go index 0aa0c92..8760a83 100644 --- a/syncer/server/handler.go +++ b/syncer/server/handler.go @@ -19,13 +19,13 @@ package server import ( "context" "fmt" - "time" "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/pkg/util" "github.com/apache/servicecomb-service-center/syncer/grpc" pb "github.com/apache/servicecomb-service-center/syncer/proto" - "github.com/gogo/protobuf/proto" "github.com/hashicorp/serf/serf" + myserf "github.com/apache/servicecomb-service-center/syncer/serf" ) const ( @@ -42,14 +42,8 @@ func (s *Server) tickHandler(ctx context.Context) { // Flush data to the storage of servicecenter s.servicecenter.FlushData() - event, _ := proto.Marshal(&pb.Member{ - NodeName: s.conf.ClusterName, - RPCPort: int32(s.conf.RPCPort), - Time: fmt.Sprintf("%d", time.Now().UTC().Second()), - }) - // sends a UserEvent on Serf, the event will be broadcast between members - err := s.agent.UserEvent(EventDiscovered, event, true) + err := s.agent.UserEvent(EventDiscovered, util.StringToBytesWithNoCopy(s.conf.ClusterName), true) if err != nil { log.Errorf(err, "Syncer send user event failed") } @@ -81,36 +75,31 @@ func (s *Server) HandleEvent(event serf.Event) { // userEvent Handles "EventUser" notification events, no response required func (s *Server) userEvent(event serf.UserEvent) { log.Debug("Receive serf user event") - m := &pb.Member{} - err := proto.Unmarshal(event.Payload, m) - if err != nil { - log.Errorf(err, "trigger user event '%s' handler failed", event.EventType()) - return - } + clusterName := util.BytesToStringWithNoCopy(event.Payload) // Excludes notifications from self, as the gossip protocol inevitably has redundant notifications - if s.conf.ClusterName == m.NodeName { + if s.conf.ClusterName == clusterName { return } // Get member information and get synchronized data from it - members := s.agent.GroupMembers(m.NodeName) - if members == nil || len(members) == 0{ - log.Warnf("serf member = %s is not found", m.NodeName) + members := s.agent.GroupMembers(clusterName) + if members == nil || len(members) == 0 { + log.Warnf("serf member = %s is not found", clusterName) return } // todo: grpc supports multi-address polling // Get dta from remote member - endpoint := fmt.Sprintf("%s:%d", members[0].Addr, m.RPCPort) - log.Debugf("Going to pull data from %s %s", m.NodeName, endpoint) + endpoint := fmt.Sprintf("%s:%s", members[0].Addr, members[0].Tags[myserf.TagKeyRPCPort]) + log.Debugf("Going to pull data from %s %s", members[0].Name, endpoint) data, err := grpc.Pull(context.Background(), endpoint) if err != nil { - log.Errorf(err, "Pull other serf instances failed, node name is '%s'", m.NodeName) + log.Errorf(err, "Pull other serf instances failed, node name is '%s'", members[0].Name) return } // Registry instances to servicecenter and update storage of it - s.servicecenter.Registry(m.NodeName, data) + s.servicecenter.Registry(clusterName, data) } // queryEvent Handles "EventQuery" query events and respond if conditions are met diff --git a/syncer/servicecenter/servicecenter.go b/syncer/servicecenter/servicecenter.go index 5e7c8c3..b6f5446 100644 --- a/syncer/servicecenter/servicecenter.go +++ b/syncer/servicecenter/servicecenter.go @@ -28,7 +28,7 @@ import ( type Servicecenter interface { SetStorage(storage Storage) FlushData() - Registry(nodeName string, data *pb.SyncData) + Registry(clusterName string, data *pb.SyncData) Discovery() *pb.SyncData } @@ -42,8 +42,8 @@ type Storage interface { UpdateData(data *pb.SyncData) GetMaps() (maps pb.SyncMapping) UpdateMaps(maps pb.SyncMapping) - GetMapByNode(nodeName string) (mapping pb.SyncMapping) - UpdateMapByNode(nodeName string, mapping pb.SyncMapping) + GetMapByNode(clusterName string) (mapping pb.SyncMapping) + UpdateMapByNode(clusterName string, mapping pb.SyncMapping) } // NewServicecenter new store with endpoints @@ -78,8 +78,8 @@ func (s *servicecenter) FlushData() { } // Registry registry data to the servicecenter, update mapping data -func (s *servicecenter) Registry(nodeName string, data *pb.SyncData) { - mapping := s.storage.GetMapByNode(nodeName) +func (s *servicecenter) Registry(clusterName string, data *pb.SyncData) { + mapping := s.storage.GetMapByNode(clusterName) for _, svc := range data.Services { log.Debugf("trying to do registration of service, serviceID = %s", svc.Service.ServiceId) // If the svc is in the mapping, just do nothing, if not, created it in servicecenter and get the new serviceID @@ -98,7 +98,7 @@ func (s *servicecenter) Registry(nodeName string, data *pb.SyncData) { OrgServiceID: inst.ServiceId, OrgInstanceID: inst.InstanceId, CurServiceID: svcID, - NodeName: nodeName, + ClusterName: clusterName, } item.CurInstanceID = s.registryInstances(svc.DomainProject, svcID, inst) @@ -111,7 +111,7 @@ func (s *servicecenter) Registry(nodeName string, data *pb.SyncData) { // UnRegistry instances that is not in the data which means the instance in the mapping is no longer actived mapping = s.unRegistryInstances(data, mapping) // Update mapping data of the node to the storage of the servicecenter - s.storage.UpdateMapByNode(nodeName, mapping) + s.storage.UpdateMapByNode(clusterName, mapping) } // Discovery discovery data from storage diff --git a/syncer/servicecenter/servicecenter_test.go b/syncer/servicecenter/servicecenter_test.go index 3c38b03..7036b9d 100644 --- a/syncer/servicecenter/servicecenter_test.go +++ b/syncer/servicecenter/servicecenter_test.go @@ -79,8 +79,8 @@ func TestOnEvent(t *testing.T) { return } - nodeName := "test_node" - dc.Registry(nodeName, data) + clusterName := "test_node" + dc.Registry(clusterName, data) mockplugin.SetGetAll(mockplugin.NewGetAll) dc.FlushData() @@ -90,25 +90,25 @@ func TestOnEvent(t *testing.T) { return } - dc.Registry(nodeName, newData) + dc.Registry(clusterName, newData) mockplugin.SetRegisterInstance(func(ctx context.Context, domainProject, serviceId string, instance *proto.MicroServiceInstance) (s string, e error) { return "", errors.New("test error") }) - dc.Registry(nodeName, data) + dc.Registry(clusterName, data) mockplugin.SetRegisterInstance(nil) - dc.Registry(nodeName, data) + dc.Registry(clusterName, data) - dc.Registry(nodeName, data) + dc.Registry(clusterName, data) mockplugin.SetHeartbeat(func(ctx context.Context, domainProject, serviceId, instanceId string) error { return errors.New("test error") }) - dc.Registry(nodeName, data) + dc.Registry(clusterName, data) } func initPlugin(conf *config.Config) {
