This is an automated email from the ASF dual-hosted git repository.

youling1128 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/dev by this push:
     new ac086ede Syncer supports to enable rbac (#1491)
ac086ede is described below

commit ac086ede84dfa1e122ea58ee4d3e5005ff733b69
Author: humingcheng <[email protected]>
AuthorDate: Tue Oct 8 18:12:00 2024 +0800

    Syncer supports to enable rbac (#1491)
---
 .github/workflows/eventbase-ci.yml      |   4 +-
 .github/workflows/static_check.yml      |   2 +-
 docker-compose.yml                      |   2 +-
 etc/conf/syncer.yaml                    |   2 +
 scripts/integration_test.sh             |   2 +-
 scripts/ut_test_in_docker.sh            |   4 +-
 syncer/config/config.go                 |  13 +++-
 syncer/rpc/auth.go                      |  63 ++++++++++++++++
 syncer/rpc/auth_test.go                 | 130 ++++++++++++++++++++++++++++++++
 syncer/rpc/server.go                    |  38 +++++++++-
 syncer/rpc/server_test.go               |  85 +++++++++++++++++++++
 syncer/server/server.go                 |  14 +++-
 syncer/service/admin/health.go          |  55 +++++++++-----
 syncer/service/replicator/replicator.go |  33 ++++++--
 14 files changed, 405 insertions(+), 42 deletions(-)

diff --git a/.github/workflows/eventbase-ci.yml 
b/.github/workflows/eventbase-ci.yml
index b0918b7a..87dfd75c 100644
--- a/.github/workflows/eventbase-ci.yml
+++ b/.github/workflows/eventbase-ci.yml
@@ -13,7 +13,7 @@ jobs:
         uses: actions/checkout@v1
       - name: UT test
         run: |
-          sudo docker-compose -f ./scripts/docker-compose.yaml up -d
+          sudo docker compose -f ./scripts/docker-compose.yaml up -d
           sleep 20
           export TEST_DB_MODE=mongo
           export TEST_DB_URI=mongodb://127.0.0.1:27017
@@ -31,7 +31,7 @@ jobs:
         uses: actions/checkout@v1
       - name: UT for etcd
         run: |
-          time docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd etcd 
-name etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls 
http://0.0.0.0:2379
+          time docker run -d -p 2379:2379 --name etcd 
quay.io/coreos/etcd:v3.5.15 etcd -name etcd --advertise-client-urls 
http://0.0.0.0:2379 --listen-client-urls http://0.0.0.0:2379
           while ! nc -z 127.0.0.1 2379; do
             sleep 1
           done
diff --git a/.github/workflows/static_check.yml 
b/.github/workflows/static_check.yml
index fe95ef40..00b4a8e4 100644
--- a/.github/workflows/static_check.yml
+++ b/.github/workflows/static_check.yml
@@ -40,7 +40,7 @@ jobs:
         uses: actions/checkout@v1
       - name: UT-MONGO
         run: |
-          sudo docker-compose -f ./scripts/docker-compose.yaml up -d
+          sudo docker compose -f ./scripts/docker-compose.yaml up -d
           sleep 20
           bash -x scripts/ut_test_in_docker.sh mongo
   local:
diff --git a/docker-compose.yml b/docker-compose.yml
index c7be231e..e50d3d99 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -17,7 +17,7 @@
 version: '3'
 services:
   etcd:
-    image: 'quay.io/coreos/etcd:latest'
+    image: 'quay.io/coreos/etcd:v3.5.15'
     # restart: always
     #ports:
     #  - "2379:2379"
diff --git a/etc/conf/syncer.yaml b/etc/conf/syncer.yaml
index 784f86a4..11b65a64 100644
--- a/etc/conf/syncer.yaml
+++ b/etc/conf/syncer.yaml
@@ -1,11 +1,13 @@
 sync:
   enableOnStart: false
+  rbacEnabled: false
   peers:
     - name: dc
       kind: servicecomb
       endpoints: ["127.0.0.1:30105"]
       # only allow mode implemented in incremental approach like push, 
watch(such as pub/sub, long polling)
       mode: [push]
+      token:
   tombstone:
     retire:
       # use linux crontab not Quartz cron
diff --git a/scripts/integration_test.sh b/scripts/integration_test.sh
index 6b4c4a92..a5984f73 100755
--- a/scripts/integration_test.sh
+++ b/scripts/integration_test.sh
@@ -41,7 +41,7 @@ set +e
 docker rm -f etcd
 kill -9 $(ps aux | grep 'service-center' | awk '{print $2}')
 set -e
-sudo docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 
40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd etcd 
-name etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 
-listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 
-initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls 
http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster 
etcd0=http://127.0.0.1:23800 -initial-cluster-state new
+sudo docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 
40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.15 
etcd -name etcd0 -advertise-client-urls 
http://127.0.0.1:2379,http://127.0.0.1:40010 -listen-client-urls 
http://0.0.0.0:2379,http://0.0.0.0:40010 -initial-advertise-peer-urls 
http://127.0.0.1:23800 -listen-peer-urls http://0.0.0.0:23800 
-initial-cluster-token etcd-cluster-1 -initial-cluster 
etcd0=http://127.0.0.1:23800 -initial-cluster-state new
 while ! nc -z 127.0.0.1 2379; do
   echo "Waiting Etcd to launch on 2379..."
   sleep 1
diff --git a/scripts/ut_test_in_docker.sh b/scripts/ut_test_in_docker.sh
index 9e76d121..357d0e6f 100644
--- a/scripts/ut_test_in_docker.sh
+++ b/scripts/ut_test_in_docker.sh
@@ -31,7 +31,7 @@ echo "${green}Starting Unit Testing for Service 
Center${reset}"
 
 if [ "${db_name}" == "etcd" ];then
   echo "${green}Starting etcd in docker${reset}"
-  docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 
-p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name etcd0 
-advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 
-listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 
-initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls 
http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster 
etcd0=http://127.0.0.1:23800 -initial-cluster-state new
+  docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 
-p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.15 etcd -name 
etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 
-listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 
-initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls 
http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster 
etcd0=http://127.0.0.1:23800 -initial-cluster-state new
   while ! nc -z 127.0.0.1 2379; do
     echo "Waiting Etcd to launch on 2379..."
     sleep 1
@@ -45,7 +45,7 @@ elif [ ${db_name} == "mongo" ];then
   echo "${green}mongodb is running......${reset}"
 elif [ ${db_name} == "local" ];then
   echo "${green}Starting etcd in docker${reset}"
-  docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 
-p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name etcd0 
-advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 
-listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 
-initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls 
http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster 
etcd0=http://127.0.0.1:23800 -initial-cluster-state new
+  docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 
-p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.15 etcd -name 
etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 
-listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 
-initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls 
http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster 
etcd0=http://127.0.0.1:23800 -initial-cluster-state new
   while ! nc -z 127.0.0.1 2379; do
     echo "Waiting Etcd to launch on 2379..."
     sleep 1
diff --git a/syncer/config/config.go b/syncer/config/config.go
index de01f5ec..a568d5c9 100644
--- a/syncer/config/config.go
+++ b/syncer/config/config.go
@@ -21,9 +21,10 @@ import (
        "fmt"
        "path/filepath"
 
+       "github.com/go-chassis/go-archaius"
+
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/pkg/util"
-       "github.com/go-chassis/go-archaius"
 )
 
 var config Config
@@ -33,8 +34,12 @@ type Config struct {
 }
 
 type Sync struct {
-       EnableOnStart bool    `yaml:"enableOnStart"`
-       Peers         []*Peer `yaml:"peers"`
+       EnableOnStart bool `yaml:"enableOnStart"`
+       // When RbacEnabled is true, syncer's API requires the rbac token,
+       // and service-center also provides the rbac token to communicate with 
peer.
+       // At the same time, service-center rbac must be enabled.
+       RbacEnabled bool    `yaml:"rbacEnabled"`
+       Peers       []*Peer `yaml:"peers"`
 }
 
 type Peer struct {
@@ -42,6 +47,8 @@ type Peer struct {
        Kind      string   `yaml:"kind"`
        Endpoints []string `yaml:"endpoints"`
        Mode      []string `yaml:"mode"`
+       // The token to communicate with peer, this takes effect only when 
RbacEnabled is true
+       Token string `yaml:"token"`
 }
 
 func Init() error {
diff --git a/syncer/rpc/auth.go b/syncer/rpc/auth.go
new file mode 100644
index 00000000..9f3ce607
--- /dev/null
+++ b/syncer/rpc/auth.go
@@ -0,0 +1,63 @@
+package rpc
+
+import (
+       "context"
+       "fmt"
+       "strings"
+
+       "github.com/go-chassis/cari/rbac"
+       "github.com/go-chassis/go-chassis/v2/security/authr"
+       "github.com/go-chassis/go-chassis/v2/server/restful"
+       "google.golang.org/grpc/metadata"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/syncer/config"
+)
+
+var errWrongAccountNorRole = fmt.Errorf("account should be %s, and roles 
should contain %s", RbacAllowedAccountName, RbacAllowedRoleName)
+
+func auth(ctx context.Context) error {
+       if !config.GetConfig().Sync.RbacEnabled {
+               return nil
+       }
+       md, ok := metadata.FromIncomingContext(ctx)
+       if !ok {
+               return rbac.NewError(rbac.ErrNoAuthHeader, "")
+       }
+
+       authHeader := md.Get(restful.HeaderAuth)
+       if len(authHeader) == 0 {
+               return rbac.NewError(rbac.ErrNoAuthHeader, fmt.Sprintf("header 
%s not found nor content empty", restful.HeaderAuth))
+       }
+
+       s := strings.Split(authHeader[0], " ")
+       if len(s) != 2 {
+               return rbac.ErrInvalidHeader
+       }
+       to := s[1]
+
+       claims, err := authr.Authenticate(ctx, to)
+       if err != nil {
+               return err
+       }
+       m, ok := claims.(map[string]interface{})
+       if !ok {
+               log.Error("claims convert failed", rbac.ErrConvert)
+               return rbac.ErrConvert
+       }
+       account, err := rbac.GetAccount(m)
+       if err != nil {
+               log.Error("get account from token failed", err)
+               return err
+       }
+
+       if account.Name != RbacAllowedAccountName {
+               return errWrongAccountNorRole
+       }
+       for _, role := range account.Roles {
+               if role == RbacAllowedRoleName {
+                       return nil
+               }
+       }
+       return errWrongAccountNorRole
+}
diff --git a/syncer/rpc/auth_test.go b/syncer/rpc/auth_test.go
new file mode 100644
index 00000000..2cc1bcba
--- /dev/null
+++ b/syncer/rpc/auth_test.go
@@ -0,0 +1,130 @@
+package rpc
+
+import (
+       "context"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "testing"
+
+       "github.com/go-chassis/cari/pkg/errsvc"
+       "github.com/go-chassis/cari/rbac"
+       "github.com/go-chassis/go-chassis/v2/security/authr"
+       "github.com/go-chassis/go-chassis/v2/server/restful"
+       "github.com/stretchr/testify/assert"
+       "google.golang.org/grpc/metadata"
+
+       "github.com/apache/servicecomb-service-center/syncer/config"
+)
+
+type testAuth struct{}
+
+func (testAuth) Login(ctx context.Context, user string, password string, opts 
...authr.LoginOption) (string, error) {
+       return "", nil
+}
+
+func (testAuth) Authenticate(ctx context.Context, token string) (interface{}, 
error) {
+       var claim map[string]interface{}
+       return claim, json.Unmarshal([]byte(token), &claim)
+}
+
+func Test_auth(t *testing.T) {
+       // use the custom auth plugin
+       authr.Install("test", func(opts *authr.Options) (authr.Authenticator, 
error) {
+               return testAuth{}, nil
+       })
+       assert.NoError(t, authr.Init(authr.WithPlugin("test")))
+
+       type args struct {
+               ctx context.Context
+       }
+       tests := []struct {
+               name    string
+               preDo   func()
+               args    args
+               wantErr assert.ErrorAssertionFunc
+       }{
+               {
+                       name: "sync rbac disables",
+                       preDo: func() {
+                               config.SetConfig(config.Config{
+                                       Sync: &config.Sync{
+                                               RbacEnabled: false,
+                                       }})
+                       },
+                       args: args{
+                               ctx: context.Background(), // rbac disabled, 
empty ctx should pass the auth
+                       },
+                       wantErr: assert.NoError,
+               },
+               {
+                       name: "no header",
+                       preDo: func() {
+                               config.SetConfig(config.Config{
+                                       Sync: &config.Sync{
+                                               RbacEnabled: true,
+                                       }})
+                       },
+                       args: args{
+                               ctx: context.Background(), // rbac enabled, 
empty ctx should not pass the auth
+                       },
+                       wantErr: func(t assert.TestingT, err error, i 
...interface{}) bool {
+                               var errSvcErr *errsvc.Error
+                               ok := errors.As(err, &errSvcErr)
+                               assert.True(t, ok)
+
+                               return assert.Equal(t, rbac.ErrNoAuthHeader, 
errSvcErr.Code)
+                       },
+               },
+               {
+                       name: "with header but no auth header",
+                       args: args{
+                               ctx: 
metadata.NewIncomingContext(context.Background(), 
metadata.New(map[string]string{"fake": "fake"})),
+                       },
+                       wantErr: func(t assert.TestingT, err error, i 
...interface{}) bool {
+                               var errSvcErr *errsvc.Error
+                               ok := errors.As(err, &errSvcErr)
+                               assert.True(t, ok)
+
+                               return assert.Equal(t, rbac.ErrNoAuthHeader, 
errSvcErr.Code)
+                       },
+               },
+               {
+                       name: "auth header format error",
+                       args: args{
+                               ctx: 
metadata.NewIncomingContext(context.Background(), 
metadata.New(map[string]string{restful.HeaderAuth: "fake"})),
+                       },
+                       wantErr: func(t assert.TestingT, err error, i 
...interface{}) bool {
+                               return assert.Equal(t, rbac.ErrInvalidHeader, 
err)
+                       },
+               },
+               {
+                       name: "wrong account nor role",
+                       args: args{
+                               ctx: 
metadata.NewIncomingContext(context.Background(),
+                                       
metadata.New(map[string]string{restful.HeaderAuth: `Bear 
{"account":"x","roles":["x"]}`})),
+                       },
+                       wantErr: func(t assert.TestingT, err error, i 
...interface{}) bool {
+                               return assert.Equal(t, errWrongAccountNorRole, 
err)
+                       },
+               },
+               {
+                       name: "valid token",
+                       args: args{
+                               ctx: 
metadata.NewIncomingContext(context.Background(),
+                                       
metadata.New(map[string]string{restful.HeaderAuth: `Bear 
{"account":"sync-user","roles":["sync-admin"]}`})),
+                       },
+                       wantErr: func(t assert.TestingT, err error, i 
...interface{}) bool {
+                               return assert.NoError(t, err)
+                       },
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       if tt.preDo != nil {
+                               tt.preDo()
+                       }
+                       tt.wantErr(t, auth(tt.args.ctx), 
fmt.Sprintf("auth(%v)", tt.args.ctx))
+               })
+       }
+}
diff --git a/syncer/rpc/server.go b/syncer/rpc/server.go
index 990f7894..dc6efee0 100644
--- a/syncer/rpc/server.go
+++ b/syncer/rpc/server.go
@@ -22,18 +22,21 @@ import (
        "fmt"
        "time"
 
-       "github.com/apache/servicecomb-service-center/syncer/service/replicator"
-       
"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
-
        "github.com/apache/servicecomb-service-center/pkg/log"
        v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
        "github.com/apache/servicecomb-service-center/syncer/config"
+       "github.com/apache/servicecomb-service-center/syncer/service/replicator"
+       
"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
 )
 
 const (
        HealthStatusConnected = "CONNECTED"
        HealthStatusAbnormal  = "ABNORMAL"
        HealthStatusClose     = "CLOSE"
+       HealthStatusAuthFail  = "AuthFail"
+
+       RbacAllowedAccountName = "sync-user"
+       RbacAllowedRoleName    = "sync-admin"
 )
 
 func NewServer() *Server {
@@ -49,6 +52,12 @@ type Server struct {
 }
 
 func (s *Server) Sync(ctx context.Context, events *v1sync.EventList) 
(*v1sync.Results, error) {
+       err := auth(ctx)
+       if err != nil {
+               log.Error("auth failed", err)
+               return generateFailedResults(events, err)
+       }
+
        log.Info(fmt.Sprintf("start sync: %s", events.Flag()))
 
        res := s.replicator.Persist(ctx, events)
@@ -56,6 +65,20 @@ func (s *Server) Sync(ctx context.Context, events 
*v1sync.EventList) (*v1sync.Re
        return s.toResults(res), nil
 }
 
+func generateFailedResults(events *v1sync.EventList, err error) 
(*v1sync.Results, error) {
+       if events == nil || len(events.Events) == 0 {
+               return &v1sync.Results{Results: map[string]*v1sync.Result{}}, 
nil
+       }
+       rsts := make(map[string]*v1sync.Result, len(events.Events))
+       for _, evt := range events.Events {
+               rsts[evt.Id] = &v1sync.Result{
+                       Code:    resource.Fail,
+                       Message: err.Error(),
+               }
+       }
+       return &v1sync.Results{Results: rsts}, nil
+}
+
 func (s *Server) toResults(results []*resource.Result) *v1sync.Results {
        syncResult := make(map[string]*v1sync.Result, len(results))
        for _, r := range results {
@@ -69,11 +92,18 @@ func (s *Server) toResults(results []*resource.Result) 
*v1sync.Results {
        }
 }
 
-func (s *Server) Health(_ context.Context, _ *v1sync.HealthRequest) 
(*v1sync.HealthReply, error) {
+func (s *Server) Health(ctx context.Context, _ *v1sync.HealthRequest) 
(*v1sync.HealthReply, error) {
        resp := &v1sync.HealthReply{
                Status:         HealthStatusConnected,
                LocalTimestamp: time.Now().UnixNano(),
        }
+       err := auth(ctx)
+       if err != nil {
+               resp.Status = HealthStatusAuthFail
+               log.Error("auth failed", err)
+               return resp, nil
+       }
+
        // TODO enable to close syncer
        if !config.GetConfig().Sync.EnableOnStart {
                resp.Status = HealthStatusClose
diff --git a/syncer/rpc/server_test.go b/syncer/rpc/server_test.go
new file mode 100644
index 00000000..4123c1ac
--- /dev/null
+++ b/syncer/rpc/server_test.go
@@ -0,0 +1,85 @@
+package rpc
+
+import (
+       "context"
+       "reflect"
+       "testing"
+
+       "github.com/go-chassis/cari/rbac"
+       "github.com/stretchr/testify/assert"
+
+       v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
+       "github.com/apache/servicecomb-service-center/syncer/config"
+       
"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
+)
+
+type testReplicator struct{}
+
+func (testReplicator) Replicate(ctx context.Context, el *v1sync.EventList) 
(*v1sync.Results, error) {
+       return &v1sync.Results{Results: map[string]*v1sync.Result{
+               "constant_id": &v1sync.Result{
+                       Code: resource.Success,
+               },
+       }}, nil
+}
+
+func (testReplicator) Persist(ctx context.Context, el *v1sync.EventList) 
[]*resource.Result {
+       return []*resource.Result{
+               &resource.Result{
+                       EventID: "constant_id",
+                       Status:  resource.Success,
+               },
+       }
+}
+
+func TestServer_Sync(t *testing.T) {
+       s := NewServer()
+
+       // rbac enabled, should sync failed and return auth failed message
+       config.SetConfig(config.Config{
+               Sync: &config.Sync{
+                       RbacEnabled: true,
+               }})
+       events := &v1sync.EventList{Events: []*v1sync.Event{
+               {
+                       Id: "evt1",
+               },
+               {
+                       Id: "evt2",
+               },
+       }}
+
+       expectedRst := map[string]*v1sync.Result{
+               "evt1": &v1sync.Result{
+                       Code:    resource.Fail,
+                       Message: rbac.NewError(rbac.ErrNoAuthHeader, 
"").Error(),
+               },
+
+               "evt2": &v1sync.Result{
+                       Code:    resource.Fail,
+                       Message: rbac.NewError(rbac.ErrNoAuthHeader, 
"").Error(),
+               },
+       }
+       rst, err := s.Sync(context.Background(), events)
+       assert.NoError(t, err)
+       assert.True(t, reflect.DeepEqual(expectedRst, rst.Results))
+
+       rst, err = s.Sync(context.Background(), nil) // nil input
+       assert.NoError(t, err)
+       assert.Equal(t, 0, len(rst.Results))
+
+       // rbac disabled, should sync success(with the mock replicator)
+       config.SetConfig(config.Config{
+               Sync: &config.Sync{
+                       RbacEnabled: false,
+               }})
+       expectedRst = map[string]*v1sync.Result{
+               "constant_id": &v1sync.Result{
+                       Code: resource.Success,
+               },
+       }
+       s.replicator = &testReplicator{}
+       rst, err = s.Sync(context.Background(), events)
+       assert.NoError(t, err)
+       assert.True(t, reflect.DeepEqual(expectedRst, rst.Results))
+}
diff --git a/syncer/server/server.go b/syncer/server/server.go
index c880d55f..095ae615 100644
--- a/syncer/server/server.go
+++ b/syncer/server/server.go
@@ -18,6 +18,9 @@
 package server
 
 import (
+       "github.com/go-chassis/go-chassis/v2"
+       chassisServer "github.com/go-chassis/go-chassis/v2/core/server"
+
        "github.com/apache/servicecomb-service-center/pkg/log"
        syncv1 "github.com/apache/servicecomb-service-center/syncer/api/v1"
        "github.com/apache/servicecomb-service-center/syncer/config"
@@ -25,8 +28,6 @@ import (
        "github.com/apache/servicecomb-service-center/syncer/rpc"
        "github.com/apache/servicecomb-service-center/syncer/service/admin"
        "github.com/apache/servicecomb-service-center/syncer/service/sync"
-       "github.com/go-chassis/go-chassis/v2"
-       chassisServer "github.com/go-chassis/go-chassis/v2/core/server"
 )
 
 // Run register chassis schema and run syncer services before chassis.Run()
@@ -40,6 +41,15 @@ func Run() {
                return
        }
 
+       if len(config.GetConfig().Sync.Peers) <= 0 {
+               log.Warn("peers parameter configuration is empty")
+               return
+       }
+
+       if config.GetConfig().Sync.RbacEnabled {
+               log.Info("syncer rbac enabled")
+       }
+
        chassis.RegisterSchema("grpc", rpc.NewServer(),
                
chassisServer.WithRPCServiceDesc(&syncv1.EventService_ServiceDesc))
 
diff --git a/syncer/service/admin/health.go b/syncer/service/admin/health.go
index 99c9db79..cdba7358 100644
--- a/syncer/service/admin/health.go
+++ b/syncer/service/admin/health.go
@@ -20,13 +20,17 @@ package admin
 import (
        "context"
        "errors"
+       "fmt"
        "time"
 
+       "github.com/go-chassis/go-chassis/v2/server/restful"
        "google.golang.org/grpc"
+       "google.golang.org/grpc/metadata"
 
        "github.com/apache/servicecomb-service-center/client"
        "github.com/apache/servicecomb-service-center/pkg/log"
        pkgrpc "github.com/apache/servicecomb-service-center/pkg/rpc"
+       
"github.com/apache/servicecomb-service-center/server/plugin/security/cipher"
        v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
        syncerclient 
"github.com/apache/servicecomb-service-center/syncer/client"
        "github.com/apache/servicecomb-service-center/syncer/config"
@@ -59,25 +63,15 @@ type Peer struct {
        Mode      []string `json:"mode"`
        Endpoints []string `json:"endpoints"`
        Status    string   `json:"status"`
+       Token     string   `json:"-"`
 }
 
 func Init() {
        cfg := config.GetConfig()
-       if cfg.Sync == nil {
-               log.Warn("sync config is empty")
-               return
-       }
-       if !cfg.Sync.EnableOnStart {
-               log.Info("syncer is disabled")
-               return
-       }
-       if len(cfg.Sync.Peers) <= 0 {
-               log.Warn("peers parameter configuration is empty")
-               return
-       }
        peerInfos = make([]*PeerInfo, 0, len(cfg.Sync.Peers))
        for _, c := range cfg.Sync.Peers {
                if len(c.Endpoints) <= 0 {
+                       log.Warn("no endpoints of peer: " + c.Name)
                        continue
                }
                p := &Peer{
@@ -86,10 +80,21 @@ func Init() {
                        Mode:      c.Mode,
                        Endpoints: c.Endpoints,
                }
+               if config.GetConfig().Sync.RbacEnabled {
+                       plainToken, err := cipher.Decrypt(c.Token)
+                       if err != nil {
+                               log.Error(fmt.Sprintf("decrypt token of peer %s 
failed, use original content", c.Name), err)
+                               plainToken = c.Token
+                       }
+                       p.Token = plainToken
+               }
+
                conn, err := newRPCConn(p.Endpoints)
-               if err == nil {
-                       peerInfos = append(peerInfos, &PeerInfo{Peer: p, 
ClientConn: conn})
+               if err != nil {
+                       log.Error(fmt.Sprintf("new client failed for peer: %s", 
c.Name), err)
+                       continue
                }
+               peerInfos = append(peerInfos, &PeerInfo{Peer: p, ClientConn: 
conn})
        }
 }
 
@@ -103,7 +108,7 @@ func Health() (*Resp, error) {
                if len(peerInfo.Peer.Endpoints) <= 0 {
                        continue
                }
-               status := getPeerStatus(peerInfo.Peer.Name, peerInfo.ClientConn)
+               status := getPeerStatus(peerInfo)
                resp.Peers = append(resp.Peers, &Peer{
                        Name:      peerInfo.Peer.Name,
                        Kind:      peerInfo.Peer.Kind,
@@ -117,19 +122,25 @@ func Health() (*Resp, error) {
        return resp, nil
 }
 
-func getPeerStatus(peerName string, clientConn *grpc.ClientConn) string {
-       if clientConn == nil {
+func getPeerStatus(peerInfo *PeerInfo) string {
+       if peerInfo.ClientConn == nil {
                log.Warn("clientConn is nil")
                return rpc.HealthStatusAbnormal
        }
        local := time.Now().UnixNano()
-       set := client.NewSet(clientConn)
-       reply, err := set.EventServiceClient.Health(context.Background(), 
&v1sync.HealthRequest{})
+       set := client.NewSet(peerInfo.ClientConn)
+       ctx := context.Background()
+       if config.GetConfig().Sync.RbacEnabled {
+               ctx = metadata.NewOutgoingContext(ctx, 
metadata.New(map[string]string{
+                       restful.HeaderAuth: "Bearer " + peerInfo.Peer.Token,
+               }))
+       }
+       reply, err := set.EventServiceClient.Health(ctx, 
&v1sync.HealthRequest{})
        if err != nil || reply == nil {
                log.Error("get peer health failed", err)
                return rpc.HealthStatusAbnormal
        }
-       reportClockDiff(peerName, local, reply.LocalTimestamp)
+       reportClockDiff(peerInfo.Peer.Name, local, reply.LocalTimestamp)
        return reply.Status
 }
 
@@ -159,3 +170,7 @@ func newRPCConn(endpoints []string) (*grpc.ClientConn, 
error) {
                TLSConfig:   syncerclient.RPClientConfig(),
        })
 }
+
+func Peers() []*PeerInfo {
+       return peerInfos
+}
diff --git a/syncer/service/replicator/replicator.go 
b/syncer/service/replicator/replicator.go
index d7f1e3d4..52adfc2d 100644
--- a/syncer/service/replicator/replicator.go
+++ b/syncer/service/replicator/replicator.go
@@ -21,16 +21,20 @@ import (
        "context"
        "fmt"
 
+       "github.com/go-chassis/foundation/gopool"
+       "github.com/go-chassis/go-chassis/v2/server/restful"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/metadata"
+
        "github.com/apache/servicecomb-service-center/client"
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/pkg/rpc"
        "github.com/apache/servicecomb-service-center/pkg/util"
+       
"github.com/apache/servicecomb-service-center/server/plugin/security/cipher"
        v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1"
        syncerclient 
"github.com/apache/servicecomb-service-center/syncer/client"
        "github.com/apache/servicecomb-service-center/syncer/config"
        
"github.com/apache/servicecomb-service-center/syncer/service/replicator/resource"
-       "github.com/go-chassis/foundation/gopool"
-       "google.golang.org/grpc"
 )
 
 const (
@@ -48,7 +52,8 @@ var (
 )
 
 var (
-       conn *grpc.ClientConn
+       conn      *grpc.ClientConn
+       peerToken = ""
 )
 
 func Work() error {
@@ -68,8 +73,7 @@ func Work() error {
 }
 
 func InitSyncClient() error {
-       cfg := config.GetConfig()
-       peer := cfg.Sync.Peers[0]
+       peer := config.GetConfig().Sync.Peers[0]
        log.Info(fmt.Sprintf("peer is %v", peer))
        var err error
        conn, err = rpc.GetRoundRobinLbConn(&rpc.Config{
@@ -78,7 +82,19 @@ func InitSyncClient() error {
                ServiceName: serviceName,
                TLSConfig:   syncerclient.RPClientConfig(),
        })
-       return err
+       if err != nil {
+               log.Error("get rpc client failed", err)
+               return err
+       }
+       if !config.GetConfig().Sync.RbacEnabled {
+               return nil
+       }
+       peerToken, err = cipher.Decrypt(peer.Token)
+       if err != nil {
+               log.Error("decrypt peer token failed, use original content", 
err)
+               peerToken = peer.Token
+       }
+       return nil
 }
 
 func Close() {
@@ -161,6 +177,11 @@ func (r *replicatorManager) replicate(ctx context.Context, 
el *v1sync.EventList)
        }
 
        log.Info(fmt.Sprintf("page count %d to sync", len(els)))
+       if config.GetConfig().Sync.RbacEnabled {
+               ctx = metadata.NewOutgoingContext(ctx, 
metadata.New(map[string]string{
+                       restful.HeaderAuth: "Bearer " + peerToken,
+               }))
+       }
 
        for _, in := range els {
                res, err := set.EventServiceClient.Sync(ctx, in)

Reply via email to