This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new c63d0ba  [feat]add health api in sync package (#1205)
c63d0ba is described below

commit c63d0ba1e4d77b70aef686dce5120d5dab953897
Author: xiaoluoluo <[email protected]>
AuthorDate: Sat Jan 8 17:19:06 2022 +0800

    [feat]add health api in sync package (#1205)
    
    Co-authored-by: [email protected] <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>
---
 api/sync/v1/event_service.pb.go      | 167 ++++++++++++++++++++++++++++++-----
 api/sync/v1/event_service.proto      |   8 ++
 api/sync/v1/event_service_grpc.pb.go |  36 ++++++++
 client/set.go                        |  18 +---
 client/set_test.go                   |  14 ++-
 etc/conf/syncer.yaml                 |   9 +-
 pkg/rpc/client.go                    |  71 +++++++++++++++
 server/rpc/sync/server.go            |  15 ++++
 syncer/config/config.go              |  16 +++-
 syncer/config/config_test.go         |  11 +++
 syncer/resource/admin/admin.go       |  23 ++++-
 syncer/service/admin/health.go       |  95 ++++++++++++++++++++
 syncer/service/admin/health_test.go  | 118 +++++++++++++++++++++++++
 13 files changed, 547 insertions(+), 54 deletions(-)

diff --git a/api/sync/v1/event_service.pb.go b/api/sync/v1/event_service.pb.go
index 5463cc1..5cc3994 100644
--- a/api/sync/v1/event_service.pb.go
+++ b/api/sync/v1/event_service.pb.go
@@ -248,6 +248,91 @@ func (x *Result) GetMessage() string {
        return ""
 }
 
+type HealthRequest struct {
+       state         protoimpl.MessageState
+       sizeCache     protoimpl.SizeCache
+       unknownFields protoimpl.UnknownFields
+}
+
+func (x *HealthRequest) Reset() {
+       *x = HealthRequest{}
+       if protoimpl.UnsafeEnabled {
+               mi := &file_event_service_proto_msgTypes[4]
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               ms.StoreMessageInfo(mi)
+       }
+}
+
+func (x *HealthRequest) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*HealthRequest) ProtoMessage() {}
+
+func (x *HealthRequest) ProtoReflect() protoreflect.Message {
+       mi := &file_event_service_proto_msgTypes[4]
+       if protoimpl.UnsafeEnabled && x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use HealthRequest.ProtoReflect.Descriptor instead.
+func (*HealthRequest) Descriptor() ([]byte, []int) {
+       return file_event_service_proto_rawDescGZIP(), []int{4}
+}
+
+type HealthReply struct {
+       state         protoimpl.MessageState
+       sizeCache     protoimpl.SizeCache
+       unknownFields protoimpl.UnknownFields
+
+       Status string `protobuf:"bytes,1,opt,name=status,proto3" 
json:"status,omitempty"`
+}
+
+func (x *HealthReply) Reset() {
+       *x = HealthReply{}
+       if protoimpl.UnsafeEnabled {
+               mi := &file_event_service_proto_msgTypes[5]
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               ms.StoreMessageInfo(mi)
+       }
+}
+
+func (x *HealthReply) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*HealthReply) ProtoMessage() {}
+
+func (x *HealthReply) ProtoReflect() protoreflect.Message {
+       mi := &file_event_service_proto_msgTypes[5]
+       if protoimpl.UnsafeEnabled && x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use HealthReply.ProtoReflect.Descriptor instead.
+func (*HealthReply) Descriptor() ([]byte, []int) {
+       return file_event_service_proto_rawDescGZIP(), []int{5}
+}
+
+func (x *HealthReply) GetStatus() string {
+       if x != nil {
+               return x.Status
+       }
+       return ""
+}
+
 var File_event_service_proto protoreflect.FileDescriptor
 
 var file_event_service_proto_rawDesc = []byte{
@@ -282,16 +367,24 @@ var file_event_service_proto_rawDesc = []byte{
        0x38, 0x01, 0x22, 0x36, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 
0x12, 0x12, 0x0a, 0x04,
        0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 
0x63, 0x6f, 0x64, 0x65,
        0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 
0x02, 0x20, 0x01, 0x28,
-       0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x46, 
0x0a, 0x0c, 0x45, 0x76,
-       0x65, 0x6e, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x36, 
0x0a, 0x04, 0x53, 0x79,
-       0x6e, 0x63, 0x12, 0x16, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x73, 0x79, 0x6e, 
0x63, 0x2e, 0x76, 0x31,
-       0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x4c, 0x69, 0x73, 0x74, 0x1a, 0x14, 
0x2e, 0x61, 0x70, 0x69,
-       0x2e, 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73, 
0x75, 0x6c, 0x74, 0x73,
-       0x22, 0x00, 0x42, 0x3d, 0x5a, 0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 
0x2e, 0x63, 0x6f, 0x6d,
-       0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x65, 0x72, 0x76, 
0x69, 0x63, 0x65, 0x63,
-       0x6f, 0x6d, 0x62, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2d, 
0x63, 0x65, 0x6e, 0x74,
-       0x65, 0x72, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x79, 0x6e, 0x63, 0x2f, 
0x76, 0x31, 0x3b, 0x76,
-       0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+       0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x0f, 
0x0a, 0x0d, 0x48, 0x65,
+       0x61, 0x6c, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 
0x25, 0x0a, 0x0b, 0x48,
+       0x65, 0x61, 0x6c, 0x74, 0x68, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x16, 
0x0a, 0x06, 0x73, 0x74,
+       0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 
0x73, 0x74, 0x61, 0x74,
+       0x75, 0x73, 0x32, 0x88, 0x01, 0x0a, 0x0c, 0x45, 0x76, 0x65, 0x6e, 0x74, 
0x53, 0x65, 0x72, 0x76,
+       0x69, 0x63, 0x65, 0x12, 0x36, 0x0a, 0x04, 0x53, 0x79, 0x6e, 0x63, 0x12, 
0x16, 0x2e, 0x61, 0x70,
+       0x69, 0x2e, 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x45, 0x76, 
0x65, 0x6e, 0x74, 0x4c,
+       0x69, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x73, 0x79, 
0x6e, 0x63, 0x2e, 0x76,
+       0x31, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x22, 0x00, 0x12, 
0x40, 0x0a, 0x06, 0x48,
+       0x65, 0x61, 0x6c, 0x74, 0x68, 0x12, 0x1a, 0x2e, 0x61, 0x70, 0x69, 0x2e, 
0x73, 0x79, 0x6e, 0x63,
+       0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x52, 0x65, 
0x71, 0x75, 0x65, 0x73,
+       0x74, 0x1a, 0x18, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x73, 0x79, 0x6e, 0x63, 
0x2e, 0x76, 0x31, 0x2e,
+       0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 
0x00, 0x42, 0x3d, 0x5a,
+       0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 
0x61, 0x70, 0x61, 0x63,
+       0x68, 0x65, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x63, 0x6f, 
0x6d, 0x62, 0x2d, 0x73,
+       0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2d, 0x63, 0x65, 0x6e, 0x74, 0x65, 
0x72, 0x2f, 0x61, 0x70,
+       0x69, 0x2f, 0x73, 0x79, 0x6e, 0x63, 0x2f, 0x76, 0x31, 0x3b, 0x76, 0x31, 
0x62, 0x06, 0x70, 0x72,
+       0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (
@@ -306,24 +399,28 @@ func file_event_service_proto_rawDescGZIP() []byte {
        return file_event_service_proto_rawDescData
 }
 
-var file_event_service_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
+var file_event_service_proto_msgTypes = make([]protoimpl.MessageInfo, 8)
 var file_event_service_proto_goTypes = []interface{}{
-       (*EventList)(nil), // 0: api.sync.v1.EventList
-       (*Event)(nil),     // 1: api.sync.v1.Event
-       (*Results)(nil),   // 2: api.sync.v1.Results
-       (*Result)(nil),    // 3: api.sync.v1.Result
-       nil,               // 4: api.sync.v1.Event.OptsEntry
-       nil,               // 5: api.sync.v1.Results.ResultsEntry
+       (*EventList)(nil),     // 0: api.sync.v1.EventList
+       (*Event)(nil),         // 1: api.sync.v1.Event
+       (*Results)(nil),       // 2: api.sync.v1.Results
+       (*Result)(nil),        // 3: api.sync.v1.Result
+       (*HealthRequest)(nil), // 4: api.sync.v1.HealthRequest
+       (*HealthReply)(nil),   // 5: api.sync.v1.HealthReply
+       nil,                   // 6: api.sync.v1.Event.OptsEntry
+       nil,                   // 7: api.sync.v1.Results.ResultsEntry
 }
 var file_event_service_proto_depIdxs = []int32{
        1, // 0: api.sync.v1.EventList.events:type_name -> api.sync.v1.Event
-       4, // 1: api.sync.v1.Event.opts:type_name -> api.sync.v1.Event.OptsEntry
-       5, // 2: api.sync.v1.Results.results:type_name -> 
api.sync.v1.Results.ResultsEntry
+       6, // 1: api.sync.v1.Event.opts:type_name -> api.sync.v1.Event.OptsEntry
+       7, // 2: api.sync.v1.Results.results:type_name -> 
api.sync.v1.Results.ResultsEntry
        3, // 3: api.sync.v1.Results.ResultsEntry.value:type_name -> 
api.sync.v1.Result
        0, // 4: api.sync.v1.EventService.Sync:input_type -> 
api.sync.v1.EventList
-       2, // 5: api.sync.v1.EventService.Sync:output_type -> 
api.sync.v1.Results
-       5, // [5:6] is the sub-list for method output_type
-       4, // [4:5] is the sub-list for method input_type
+       4, // 5: api.sync.v1.EventService.Health:input_type -> 
api.sync.v1.HealthRequest
+       2, // 6: api.sync.v1.EventService.Sync:output_type -> 
api.sync.v1.Results
+       5, // 7: api.sync.v1.EventService.Health:output_type -> 
api.sync.v1.HealthReply
+       6, // [6:8] is the sub-list for method output_type
+       4, // [4:6] is the sub-list for method input_type
        4, // [4:4] is the sub-list for extension type_name
        4, // [4:4] is the sub-list for extension extendee
        0, // [0:4] is the sub-list for field type_name
@@ -383,6 +480,30 @@ func file_event_service_proto_init() {
                                return nil
                        }
                }
+               file_event_service_proto_msgTypes[4].Exporter = func(v 
interface{}, i int) interface{} {
+                       switch v := v.(*HealthRequest); i {
+                       case 0:
+                               return &v.state
+                       case 1:
+                               return &v.sizeCache
+                       case 2:
+                               return &v.unknownFields
+                       default:
+                               return nil
+                       }
+               }
+               file_event_service_proto_msgTypes[5].Exporter = func(v 
interface{}, i int) interface{} {
+                       switch v := v.(*HealthReply); i {
+                       case 0:
+                               return &v.state
+                       case 1:
+                               return &v.sizeCache
+                       case 2:
+                               return &v.unknownFields
+                       default:
+                               return nil
+                       }
+               }
        }
        type x struct{}
        out := protoimpl.TypeBuilder{
@@ -390,7 +511,7 @@ func file_event_service_proto_init() {
                        GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
                        RawDescriptor: file_event_service_proto_rawDesc,
                        NumEnums:      0,
-                       NumMessages:   6,
+                       NumMessages:   8,
                        NumExtensions: 0,
                        NumServices:   1,
                },
diff --git a/api/sync/v1/event_service.proto b/api/sync/v1/event_service.proto
index 4f5eb30..e218722 100644
--- a/api/sync/v1/event_service.proto
+++ b/api/sync/v1/event_service.proto
@@ -18,6 +18,14 @@ message Result {
   int32  code  = 1; //reuse standard http code
   string message = 2;
 }
+
+message HealthRequest {
+}
+message HealthReply {
+  string status = 1;
+}
+
 service EventService {
   rpc Sync(EventList) returns (Results) {}
+  rpc Health(HealthRequest) returns (HealthReply) {}
 }
\ No newline at end of file
diff --git a/api/sync/v1/event_service_grpc.pb.go 
b/api/sync/v1/event_service_grpc.pb.go
index e32d7c0..42a89e7 100644
--- a/api/sync/v1/event_service_grpc.pb.go
+++ b/api/sync/v1/event_service_grpc.pb.go
@@ -19,6 +19,7 @@ const _ = grpc.SupportPackageIsVersion7
 // For semantics around ctx use and closing/ending streaming RPCs, please 
refer to 
https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
 type EventServiceClient interface {
        Sync(ctx context.Context, in *EventList, opts ...grpc.CallOption) 
(*Results, error)
+       Health(ctx context.Context, in *HealthRequest, opts ...grpc.CallOption) 
(*HealthReply, error)
 }
 
 type eventServiceClient struct {
@@ -38,11 +39,21 @@ func (c *eventServiceClient) Sync(ctx context.Context, in 
*EventList, opts ...gr
        return out, nil
 }
 
+func (c *eventServiceClient) Health(ctx context.Context, in *HealthRequest, 
opts ...grpc.CallOption) (*HealthReply, error) {
+       out := new(HealthReply)
+       err := c.cc.Invoke(ctx, "/api.sync.v1.EventService/Health", in, out, 
opts...)
+       if err != nil {
+               return nil, err
+       }
+       return out, nil
+}
+
 // EventServiceServer is the server API for EventService service.
 // All implementations must embed UnimplementedEventServiceServer
 // for forward compatibility
 type EventServiceServer interface {
        Sync(context.Context, *EventList) (*Results, error)
+       Health(context.Context, *HealthRequest) (*HealthReply, error)
        mustEmbedUnimplementedEventServiceServer()
 }
 
@@ -53,6 +64,9 @@ type UnimplementedEventServiceServer struct {
 func (UnimplementedEventServiceServer) Sync(context.Context, *EventList) 
(*Results, error) {
        return nil, status.Errorf(codes.Unimplemented, "method Sync not 
implemented")
 }
+func (UnimplementedEventServiceServer) Health(context.Context, *HealthRequest) 
(*HealthReply, error) {
+       return nil, status.Errorf(codes.Unimplemented, "method Health not 
implemented")
+}
 func (UnimplementedEventServiceServer) 
mustEmbedUnimplementedEventServiceServer() {}
 
 // UnsafeEventServiceServer may be embedded to opt out of forward 
compatibility for this service.
@@ -84,6 +98,24 @@ func _EventService_Sync_Handler(srv interface{}, ctx 
context.Context, dec func(i
        return interceptor(ctx, in, info, handler)
 }
 
+func _EventService_Health_Handler(srv interface{}, ctx context.Context, dec 
func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, 
error) {
+       in := new(HealthRequest)
+       if err := dec(in); err != nil {
+               return nil, err
+       }
+       if interceptor == nil {
+               return srv.(EventServiceServer).Health(ctx, in)
+       }
+       info := &grpc.UnaryServerInfo{
+               Server:     srv,
+               FullMethod: "/api.sync.v1.EventService/Health",
+       }
+       handler := func(ctx context.Context, req interface{}) (interface{}, 
error) {
+               return srv.(EventServiceServer).Health(ctx, 
req.(*HealthRequest))
+       }
+       return interceptor(ctx, in, info, handler)
+}
+
 // EventService_ServiceDesc is the grpc.ServiceDesc for EventService service.
 // It's only intended for direct use with grpc.RegisterService,
 // and not to be introspected or modified (even as a copy)
@@ -95,6 +127,10 @@ var EventService_ServiceDesc = grpc.ServiceDesc{
                        MethodName: "Sync",
                        Handler:    _EventService_Sync_Handler,
                },
+               {
+                       MethodName: "Health",
+                       Handler:    _EventService_Health_Handler,
+               },
        },
        Streams:  []grpc.StreamDesc{},
        Metadata: "event_service.proto",
diff --git a/client/set.go b/client/set.go
index 1797f04..93d2758 100644
--- a/client/set.go
+++ b/client/set.go
@@ -1,31 +1,17 @@
 package client
 
 import (
-       "fmt"
-
        v1sync "github.com/apache/servicecomb-service-center/api/sync/v1"
-       "github.com/apache/servicecomb-service-center/pkg/log"
        "google.golang.org/grpc"
 )
 
-// SetConfig is client configs
-type SetConfig struct {
-       Addr string
-}
-
 // Set is set of grpc clients
 type Set struct {
        EventServiceClient v1sync.EventServiceClient
 }
 
-// NewSetForConfig dial grpc connection and create all grpc clients
-func NewSetForConfig(c SetConfig) (*Set, error) {
-       conn, err := grpc.Dial(c.Addr, grpc.WithInsecure())
-       if err != nil {
-               log.Error(fmt.Sprintf("can not connect: %s", err), nil)
-               return nil, err
-       }
+func NewSet(conn *grpc.ClientConn) *Set {
        return &Set{
                EventServiceClient: v1sync.NewEventServiceClient(conn),
-       }, nil
+       }
 }
diff --git a/client/set_test.go b/client/set_test.go
index 6986695..dfb9593 100644
--- a/client/set_test.go
+++ b/client/set_test.go
@@ -6,15 +6,21 @@ import (
 
        v1sync "github.com/apache/servicecomb-service-center/api/sync/v1"
        "github.com/apache/servicecomb-service-center/client"
+       "github.com/apache/servicecomb-service-center/pkg/rpc"
        "github.com/stretchr/testify/assert"
 )
 
 func TestNewSetForConfig(t *testing.T) {
-       cs, err := client.NewSetForConfig(client.SetConfig{
-               Addr: "127.0.0.1:30105",
-       })
+       conn, err := rpc.GetPickFirstLbConn(
+               &rpc.Config{
+                       Addrs:       []string{"127.0.0.1:30105"},
+                       Scheme:      "test",
+                       ServiceName: "serviceName",
+               })
        assert.NoError(t, err)
-       _, err = cs.EventServiceClient.Sync(context.TODO(), 
&v1sync.EventList{Events: []*v1sync.Event{
+       defer conn.Close()
+       set := client.NewSet(conn)
+       _, err = set.EventServiceClient.Sync(context.TODO(), 
&v1sync.EventList{Events: []*v1sync.Event{
                {Action: "create"},
        }})
        assert.NoError(t, err)
diff --git a/etc/conf/syncer.yaml b/etc/conf/syncer.yaml
index fa20b18..30802d5 100644
--- a/etc/conf/syncer.yaml
+++ b/etc/conf/syncer.yaml
@@ -1,17 +1,14 @@
 sync:
-  enableOnStart: false
-  datacenter:
-    name: dc1
+  enableOnStart: true
   peers:
     - name: dc2
       kind: servicecomb
-      endpoints: ["https://127.0.0.1:30100";]
+      endpoints: ["127.0.0.1:30105"]
       # only allow mode implemented in incremental approach like push, 
watch(such as pub/sub, long polling)
       mode: [push]
-      caFile: certs/ca.crt
       revision: 100
     - name: dc3
       kind: consul
+      endpoints: []
       # since consul will not push data to servcecomb, if we need set push and 
watch mode to achieve two direction sync
       mode: [push,watch]
-      revison: 200
diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go
new file mode 100644
index 0000000..7a8d8f4
--- /dev/null
+++ b/pkg/rpc/client.go
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rpc
+
+import (
+       "errors"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/resolver"
+       "google.golang.org/grpc/resolver/manual"
+)
+
+var (
+       ErrAddrEmpty = errors.New("addr is empty")
+)
+
+type Config struct {
+       Addrs       []string
+       Scheme      string
+       ServiceName string
+}
+
+func GetPickFirstLbConn(config *Config) (*grpc.ClientConn, error) {
+       return getLbConn(config.Addrs, config.Scheme, config.ServiceName, 
func() []grpc.DialOption {
+               return []grpc.DialOption{}
+       })
+}
+
+func GetRoundRobinLbConn(config *Config) (*grpc.ClientConn, error) {
+       return getLbConn(config.Addrs, config.Scheme, config.ServiceName, 
func() []grpc.DialOption {
+               return []grpc.DialOption{
+                       grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": 
[{"round_robin":{}}]}`),
+               }
+       })
+}
+
+func getLbConn(addrs []string, scheme, serviceName string, dialOptions func() 
[]grpc.DialOption) (*grpc.ClientConn, error) {
+       if len(addrs) <= 0 {
+               return nil, ErrAddrEmpty
+       }
+
+       addr := make([]resolver.Address, 0, len(addrs))
+       for _, a := range addrs {
+               addr = append(addr, resolver.Address{Addr: a})
+       }
+
+       r := manual.NewBuilderWithScheme(scheme)
+       r.InitialState(resolver.State{Addresses: addr})
+
+       opinions := dialOptions()
+       opinions = append(opinions, grpc.WithInsecure())
+       opinions = append(opinions, grpc.WithResolvers(r))
+
+       conn, err := grpc.Dial(r.Scheme()+":///"+serviceName, opinions...)
+       return conn, err
+}
diff --git a/server/rpc/sync/server.go b/server/rpc/sync/server.go
index 8dd0d43..a564c45 100644
--- a/server/rpc/sync/server.go
+++ b/server/rpc/sync/server.go
@@ -6,6 +6,13 @@ import (
 
        v1sync "github.com/apache/servicecomb-service-center/api/sync/v1"
        "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/server/config"
+)
+
+const (
+       HealthStatusConnected = "CONNECTED"
+       HealthStatusAbnormal  = "ABNORMAL"
+       HealthStatusClose     = "CLOSE"
 )
 
 type Server struct {
@@ -16,3 +23,11 @@ func (s *Server) Sync(ctx context.Context, events 
*v1sync.EventList) (*v1sync.Re
        log.Info(fmt.Sprintf("Received: %v", events.Events[0].Action))
        return &v1sync.Results{}, nil
 }
+
+func (s *Server) Health(ctx context.Context, request *v1sync.HealthRequest) 
(*v1sync.HealthReply, error) {
+       syncerEnabled := config.GetBool("sync.enableOnStart", false)
+       if !syncerEnabled {
+               return &v1sync.HealthReply{Status: HealthStatusClose}, nil
+       }
+       return &v1sync.HealthReply{Status: HealthStatusConnected}, nil
+}
diff --git a/syncer/config/config.go b/syncer/config/config.go
index 174fc7e..adccf33 100644
--- a/syncer/config/config.go
+++ b/syncer/config/config.go
@@ -33,27 +33,34 @@ type Config struct {
 }
 
 type Sync struct {
-       Peers []*Peer `yaml:"peers"`
+       EnableOnStart bool    `yaml:"enableOnStart"`
+       Peers         []*Peer `yaml:"peers"`
 }
 
 type Peer struct {
-       // TODO
+       Name      string   `yaml:"name"`
+       Kind      string   `yaml:"kind"`
+       Endpoints []string `yaml:"endpoints"`
+       Mode      []string `yaml:"mode"`
 }
 
 func Init() error {
        err := archaius.Init(archaius.WithMemorySource(), 
archaius.WithENVSource())
        if err != nil {
                log.Fatal("can not init archaius", err)
+               return err
        }
 
        err = archaius.AddFile(filepath.Join(util.GetAppRoot(), "conf", 
"syncer.yaml"))
        if err != nil {
                log.Warn(fmt.Sprintf("can not add syncer config file source, 
error: %s", err))
+               return err
        }
 
        err = Reload()
        if err != nil {
                log.Fatal("reload syncer configs failed", err)
+               return err
        }
        return nil
 }
@@ -71,3 +78,8 @@ func Reload() error {
 func GetConfig() Config {
        return config
 }
+
+// SetConfig for UT
+func SetConfig(c Config) {
+       config = c
+}
diff --git a/syncer/config/config_test.go b/syncer/config/config_test.go
index f494184..7c753f9 100644
--- a/syncer/config/config_test.go
+++ b/syncer/config/config_test.go
@@ -18,6 +18,9 @@
 package config_test
 
 import (
+       "os"
+       "path/filepath"
+       "strings"
        "testing"
 
        "github.com/apache/servicecomb-service-center/syncer/config"
@@ -25,6 +28,14 @@ import (
 )
 
 func TestGetConfig(t *testing.T) {
+       changeConfigPath()
        assert.NoError(t, config.Init())
        assert.NotNil(t, config.GetConfig().Sync)
 }
+
+func changeConfigPath() {
+       workDir, _ := os.Getwd()
+       replacePath := filepath.Join("syncer", "config")
+       workDir = strings.ReplaceAll(workDir, replacePath, "etc")
+       os.Setenv("APP_ROOT", workDir)
+}
diff --git a/syncer/resource/admin/admin.go b/syncer/resource/admin/admin.go
index 8c971cc..784b5a1 100644
--- a/syncer/resource/admin/admin.go
+++ b/syncer/resource/admin/admin.go
@@ -20,20 +20,37 @@ package admin
 import (
        "net/http"
 
+       "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/pkg/rest"
+       "github.com/apache/servicecomb-service-center/syncer/service/admin"
+       "github.com/go-chassis/cari/discovery"
+       "github.com/go-chassis/cari/rbac"
 )
 
+const (
+       APIHealth = "/v1/syncer/health"
+)
+
+func init() {
+       rbac.Add2WhiteAPIList(APIHealth)
+}
+
 type Resource struct {
 }
 
 // URLPatterns 路由
 func (res *Resource) URLPatterns() []rest.Route {
        return []rest.Route{
-               {Method: http.MethodGet, Path: "/v1/syncer/health", Func: 
res.HealthCheck},
+               {Method: http.MethodGet, Path: APIHealth, Func: 
res.HealthCheck},
        }
 }
 
 func (res *Resource) HealthCheck(w http.ResponseWriter, r *http.Request) {
-       // TODO call health service
-       rest.WriteResponse(w, r, nil, nil)
+       healthResp, err := admin.Health()
+       if err != nil {
+               log.Error("health check failed", err)
+               rest.WriteError(w, discovery.ErrInternal, err.Error())
+               return
+       }
+       rest.WriteResponse(w, r, nil, healthResp)
 }
diff --git a/syncer/service/admin/health.go b/syncer/service/admin/health.go
new file mode 100644
index 0000000..b7c136e
--- /dev/null
+++ b/syncer/service/admin/health.go
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package admin
+
+import (
+       "context"
+       "errors"
+
+       v1sync "github.com/apache/servicecomb-service-center/api/sync/v1"
+       "github.com/apache/servicecomb-service-center/client"
+       "github.com/apache/servicecomb-service-center/pkg/rpc"
+       "github.com/apache/servicecomb-service-center/server/rpc/sync"
+       "github.com/apache/servicecomb-service-center/syncer/config"
+)
+
+const (
+       scheme      = "health_rpc"
+       serviceName = "syncer"
+)
+
+var (
+       ErrConfigIsEmpty = errors.New("sync config is empty")
+)
+
+type Resp struct {
+       Peers []*Peer `json:"peers"`
+}
+
+type Peer struct {
+       Name      string   `json:"name"`
+       Kind      string   `json:"kind"`
+       Mode      []string `json:"mode"`
+       Endpoints []string `json:"endpoints"`
+       Status    string   `json:"status"`
+}
+
+func Health() (*Resp, error) {
+
+       config := config.GetConfig()
+       if config.Sync == nil || len(config.Sync.Peers) <= 0 {
+               return nil, ErrConfigIsEmpty
+       }
+
+       resp := &Resp{Peers: make([]*Peer, 0, len(config.Sync.Peers))}
+
+       for _, c := range config.Sync.Peers {
+               if len(c.Endpoints) <= 0 {
+                       continue
+               }
+               p := &Peer{
+                       Name:      c.Name,
+                       Kind:      c.Kind,
+                       Mode:      c.Mode,
+                       Endpoints: c.Endpoints,
+               }
+               p.Status = getPeerStatus(c.Endpoints)
+               resp.Peers = append(resp.Peers, p)
+       }
+
+       if len(resp.Peers) <= 0 {
+               return nil, ErrConfigIsEmpty
+       }
+
+       return resp, nil
+}
+
+func getPeerStatus(endpoints []string) string {
+       conn, err := rpc.GetRoundRobinLbConn(&rpc.Config{Addrs: endpoints, 
Scheme: scheme, ServiceName: serviceName})
+       if err != nil || conn == nil {
+               return sync.HealthStatusAbnormal
+       }
+       defer conn.Close()
+
+       set := client.NewSet(conn)
+       reply, err := set.EventServiceClient.Health(context.Background(), 
&v1sync.HealthRequest{})
+       if err != nil || reply == nil {
+               return sync.HealthStatusAbnormal
+       }
+       return reply.Status
+}
diff --git a/syncer/service/admin/health_test.go 
b/syncer/service/admin/health_test.go
new file mode 100644
index 0000000..60d5813
--- /dev/null
+++ b/syncer/service/admin/health_test.go
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package admin
+
+import (
+       "errors"
+       "fmt"
+       "os"
+       "path/filepath"
+       "strings"
+       "testing"
+       "time"
+
+       "github.com/apache/servicecomb-service-center/syncer/config"
+       "github.com/stretchr/testify/assert"
+)
+
+func TestHealth(t *testing.T) {
+       c := config.GetConfig()
+       tests := []struct {
+               name    string
+               sync    *config.Sync
+               wantErr bool
+       }{
+               {name: "check no config ",
+                       sync:    nil,
+                       wantErr: true,
+               },
+               {name: "check no dataCenter",
+                       sync: &config.Sync{
+                               Peers: []*config.Peer{},
+                       },
+                       wantErr: true,
+               },
+               {name: "check no endpoints",
+                       sync: &config.Sync{
+                               Peers: []*config.Peer{
+                                       {Endpoints: nil},
+                               },
+                       },
+                       wantErr: true,
+               },
+               {name: "check endpoints is empty",
+                       sync: &config.Sync{
+                               Peers: []*config.Peer{
+                                       {Endpoints: []string{}},
+                               },
+                       },
+                       wantErr: true,
+               },
+
+               {name: "given normal config",
+                       sync: &config.Sync{
+                               Peers: []*config.Peer{
+                                       {Endpoints: 
[]string{"127.0.0.1:30105"}},
+                               },
+                       },
+                       wantErr: false,
+               },
+       }
+
+       for _, test := range tests {
+               c.Sync = test.sync
+               config.SetConfig(c)
+               resp, err := Health()
+               hasErr := checkError(resp, err)
+               assert.Equal(t, hasErr, test.wantErr, fmt.Sprintf("%s. health, 
wantErr %+v", test.name, test.wantErr))
+       }
+}
+
+func checkError(resp *Resp, err error) bool {
+       if err != nil {
+               return true
+       }
+
+       if resp.Peers == nil {
+               return true
+       }
+
+       if len(resp.Peers) <= 0 {
+               return true
+       }
+       return false
+}
+
+func TestHealthTotalTime(t *testing.T) {
+       changeConfigPath()
+       assert.NoError(t, config.Init())
+       now := time.Now()
+       _, err := Health()
+       assert.NoError(t, err)
+       healthEndTime := time.Now()
+       if healthEndTime.Sub(now) >= time.Second*30 {
+               assert.NoError(t, errors.New("health api total time is too 
long"))
+       }
+}
+
+func changeConfigPath() {
+       workDir, _ := os.Getwd()
+       replacePath := filepath.Join("syncer", "service", "admin")
+       workDir = strings.ReplaceAll(workDir, replacePath, "etc")
+       os.Setenv("APP_ROOT", workDir)
+}

Reply via email to