This is an automated email from the ASF dual-hosted git repository. zenlin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push: new ab569ed Change moduleServer interface definition to simplify code ab569ed is described below commit ab569eda67981a813942b1b92e7b2be099ba5519 Author: chinxââ <c54948...@126.com> AuthorDate: Tue Mar 24 23:05:59 2020 +0800 Change moduleServer interface definition to simplify code --- syncer/etcd/agent.go | 14 +++++++------- syncer/grpc/server.go | 14 +++++++------- syncer/pkg/mock/mocksotrage/etcd.go | 7 +++---- syncer/serf/agent.go | 31 +++++++++++-------------------- syncer/server/server.go | 15 ++++++--------- 5 files changed, 34 insertions(+), 47 deletions(-) diff --git a/syncer/etcd/agent.go b/syncer/etcd/agent.go index a52a511..c0607a3 100644 --- a/syncer/etcd/agent.go +++ b/syncer/etcd/agent.go @@ -32,7 +32,7 @@ type Agent struct { conf *Config etcd *embed.Etcd readyCh chan struct{} - errorCh chan error + stopCh chan struct{} } // NewAgent new etcd agent with config @@ -40,7 +40,7 @@ func NewAgent(conf *Config) *Agent { return &Agent{ conf: conf, readyCh: make(chan struct{}), - errorCh: make(chan error), + stopCh: make(chan struct{}), } } @@ -62,13 +62,13 @@ func (a *Agent) Start(ctx context.Context) { // Returns an error when running goroutine fails in the etcd startup process case err = <-etcd.Err(): case <-ctx.Done(): - err = ctx.Err() + err = errors.New("cancel etcd server from context") } } if err != nil { log.Error("start etcd failed", err) - a.errorCh <- err + close(a.stopCh) } } @@ -77,9 +77,9 @@ func (a *Agent) Ready() <-chan struct{} { return a.readyCh } -// Error Returns a channel that will be transmit an etcd error -func (a *Agent) Error() <-chan error { - return a.errorCh +// Error Returns a channel that will be closed when etcd is stopped +func (a *Agent) Stopped() <-chan struct{} { + return a.stopCh } // Storage returns etcd storage diff --git a/syncer/grpc/server.go b/syncer/grpc/server.go index 53007c5..f9aa83f 100644 --- a/syncer/grpc/server.go +++ b/syncer/grpc/server.go @@ -40,7 +40,7 @@ type Server struct { addr string handler GRPCHandler readyCh chan struct{} - errorCh chan error + stopCh chan struct{} tlsConf *tls.Config } @@ -50,7 +50,7 @@ func NewServer(addr string, handler GRPCHandler, tlsConf *tls.Config) *Server { addr: addr, handler: handler, readyCh: make(chan struct{}), - errorCh: make(chan error), + stopCh: make(chan struct{}), tlsConf: tlsConf, } } @@ -75,7 +75,7 @@ func (s *Server) Start(ctx context.Context) { var svc *grpc.Server if s.tlsConf != nil { svc = grpc.NewServer(grpc.Creds(credentials.NewTLS(s.tlsConf))) - }else{ + } else { svc = grpc.NewServer() } @@ -88,7 +88,7 @@ func (s *Server) Start(ctx context.Context) { if err != nil { log.Error("start grpc failed", err) - s.errorCh <- err + close(s.stopCh) return } log.Info("start grpc success") @@ -100,7 +100,7 @@ func (s *Server) Ready() <-chan struct{} { return s.readyCh } -// Error Returns a channel that will be transmit a grpc error -func (s *Server) Error() <-chan error { - return s.errorCh +// Error Returns a channel that will be closed a grpc is stopped +func (s *Server) Stopped() <-chan struct{} { + return s.stopCh } diff --git a/syncer/pkg/mock/mocksotrage/etcd.go b/syncer/pkg/mock/mocksotrage/etcd.go index 3604a16..576af19 100644 --- a/syncer/pkg/mock/mocksotrage/etcd.go +++ b/syncer/pkg/mock/mocksotrage/etcd.go @@ -19,6 +19,7 @@ package mocksotrage import ( "context" + "errors" "fmt" "net/url" "os" @@ -42,10 +43,8 @@ func NewKVServer() (svr *MockServer, err error) { go agent.Start(context.Background()) select { case <-agent.Ready(): - case err = <-agent.Error(): - } - if err != nil { - return nil, err + case <-agent.Stopped(): + return nil, errors.New("start etcd mock server failed") } return &MockServer{agent}, nil } diff --git a/syncer/serf/agent.go b/syncer/serf/agent.go index f1b2e28..59a9d0a 100644 --- a/syncer/serf/agent.go +++ b/syncer/serf/agent.go @@ -31,7 +31,7 @@ type Agent struct { *agent.Agent conf *Config readyCh chan struct{} - errorCh chan error + stopCh chan struct{} } // Create create serf agent with config @@ -51,26 +51,21 @@ func Create(conf *Config) (*Agent, error) { Agent: serfAgent, conf: conf, readyCh: make(chan struct{}), - errorCh: make(chan error), + stopCh: make(chan struct{}), }, nil } // Start agent func (a *Agent) Start(ctx context.Context) { err := a.Agent.Start() - if err != nil { - log.Errorf(err, "start serf agent failed") - a.errorCh <- err - return + if err == nil { + a.RegisterEventHandler(a) + err = a.retryJoin(ctx) } - a.RegisterEventHandler(a) - err = a.retryJoin(ctx) if err != nil { log.Errorf(err, "start serf agent failed") - if err != ctx.Err() && a.errorCh != nil { - a.errorCh <- err - } + close(a.stopCh) } } @@ -97,19 +92,15 @@ func (a *Agent) Ready() <-chan struct{} { return a.readyCh } -// Error Returns a channel that will be transmit a serf error -func (a *Agent) Error() <-chan error { - return a.errorCh +// Error Returns a channel that will be closed when serf is stopped +func (a *Agent) Stopped() <-chan struct{} { + return a.stopCh } // Stop serf agent func (a *Agent) Stop() { - if a.errorCh != nil { - a.Leave() - a.Shutdown() - close(a.errorCh) - a.errorCh = nil - } + a.Leave() + a.Shutdown() } // LocalMember returns the Member information for the local node diff --git a/syncer/server/server.go b/syncer/server/server.go index 17a2b60..3d04767 100644 --- a/syncer/server/server.go +++ b/syncer/server/server.go @@ -56,8 +56,8 @@ type moduleServer interface { // Returns a channel that will be closed when the module server is ready Ready() <-chan struct{} - // Returns a channel that will be transmit a module server error - Error() <-chan error + // Returns a channel that will be closed when the module server is stopped + Stopped() <-chan struct{} } // Server struct for syncer @@ -105,25 +105,21 @@ func (s *Server) Run(ctx context.Context) { err = s.startModuleServer(s.agent) if err != nil { - s.Stop() return } err = s.configureCluster() if err != nil { - s.Stop() return } err = s.startModuleServer(s.etcd) if err != nil { - s.Stop() return } err = s.startModuleServer(s.grpc) if err != nil { - s.Stop() return } @@ -170,11 +166,12 @@ func (s *Server) startModuleServer(module moduleServer) (err error) { gopool.Go(module.Start) select { case <-module.Ready(): - case err = <-module.Error(): + return nil + case <-module.Stopped(): case <-s.stopCh: - err = stopChanErr } - return err + s.Stop() + return stopChanErr } // initialization Initialize the starter of the syncer