This is an automated email from the ASF dual-hosted git repository.
zenlin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new ab569ed Change moduleServer interface definition to simplify code
ab569ed is described below
commit ab569eda67981a813942b1b92e7b2be099ba5519
Author: chinxââ <[email protected]>
AuthorDate: Tue Mar 24 23:05:59 2020 +0800
Change moduleServer interface definition to simplify code
---
syncer/etcd/agent.go | 14 +++++++-------
syncer/grpc/server.go | 14 +++++++-------
syncer/pkg/mock/mocksotrage/etcd.go | 7 +++----
syncer/serf/agent.go | 31 +++++++++++--------------------
syncer/server/server.go | 15 ++++++---------
5 files changed, 34 insertions(+), 47 deletions(-)
diff --git a/syncer/etcd/agent.go b/syncer/etcd/agent.go
index a52a511..c0607a3 100644
--- a/syncer/etcd/agent.go
+++ b/syncer/etcd/agent.go
@@ -32,7 +32,7 @@ type Agent struct {
conf *Config
etcd *embed.Etcd
readyCh chan struct{}
- errorCh chan error
+ stopCh chan struct{}
}
// NewAgent new etcd agent with config
@@ -40,7 +40,7 @@ func NewAgent(conf *Config) *Agent {
return &Agent{
conf: conf,
readyCh: make(chan struct{}),
- errorCh: make(chan error),
+ stopCh: make(chan struct{}),
}
}
@@ -62,13 +62,13 @@ func (a *Agent) Start(ctx context.Context) {
// Returns an error when running goroutine fails in the etcd
startup process
case err = <-etcd.Err():
case <-ctx.Done():
- err = ctx.Err()
+ err = errors.New("cancel etcd server from context")
}
}
if err != nil {
log.Error("start etcd failed", err)
- a.errorCh <- err
+ close(a.stopCh)
}
}
@@ -77,9 +77,9 @@ func (a *Agent) Ready() <-chan struct{} {
return a.readyCh
}
-// Error Returns a channel that will be transmit an etcd error
-func (a *Agent) Error() <-chan error {
- return a.errorCh
+// Error Returns a channel that will be closed when etcd is stopped
+func (a *Agent) Stopped() <-chan struct{} {
+ return a.stopCh
}
// Storage returns etcd storage
diff --git a/syncer/grpc/server.go b/syncer/grpc/server.go
index 53007c5..f9aa83f 100644
--- a/syncer/grpc/server.go
+++ b/syncer/grpc/server.go
@@ -40,7 +40,7 @@ type Server struct {
addr string
handler GRPCHandler
readyCh chan struct{}
- errorCh chan error
+ stopCh chan struct{}
tlsConf *tls.Config
}
@@ -50,7 +50,7 @@ func NewServer(addr string, handler GRPCHandler, tlsConf
*tls.Config) *Server {
addr: addr,
handler: handler,
readyCh: make(chan struct{}),
- errorCh: make(chan error),
+ stopCh: make(chan struct{}),
tlsConf: tlsConf,
}
}
@@ -75,7 +75,7 @@ func (s *Server) Start(ctx context.Context) {
var svc *grpc.Server
if s.tlsConf != nil {
svc =
grpc.NewServer(grpc.Creds(credentials.NewTLS(s.tlsConf)))
- }else{
+ } else {
svc = grpc.NewServer()
}
@@ -88,7 +88,7 @@ func (s *Server) Start(ctx context.Context) {
if err != nil {
log.Error("start grpc failed", err)
- s.errorCh <- err
+ close(s.stopCh)
return
}
log.Info("start grpc success")
@@ -100,7 +100,7 @@ func (s *Server) Ready() <-chan struct{} {
return s.readyCh
}
-// Error Returns a channel that will be transmit a grpc error
-func (s *Server) Error() <-chan error {
- return s.errorCh
+// Error Returns a channel that will be closed a grpc is stopped
+func (s *Server) Stopped() <-chan struct{} {
+ return s.stopCh
}
diff --git a/syncer/pkg/mock/mocksotrage/etcd.go
b/syncer/pkg/mock/mocksotrage/etcd.go
index 3604a16..576af19 100644
--- a/syncer/pkg/mock/mocksotrage/etcd.go
+++ b/syncer/pkg/mock/mocksotrage/etcd.go
@@ -19,6 +19,7 @@ package mocksotrage
import (
"context"
+ "errors"
"fmt"
"net/url"
"os"
@@ -42,10 +43,8 @@ func NewKVServer() (svr *MockServer, err error) {
go agent.Start(context.Background())
select {
case <-agent.Ready():
- case err = <-agent.Error():
- }
- if err != nil {
- return nil, err
+ case <-agent.Stopped():
+ return nil, errors.New("start etcd mock server failed")
}
return &MockServer{agent}, nil
}
diff --git a/syncer/serf/agent.go b/syncer/serf/agent.go
index f1b2e28..59a9d0a 100644
--- a/syncer/serf/agent.go
+++ b/syncer/serf/agent.go
@@ -31,7 +31,7 @@ type Agent struct {
*agent.Agent
conf *Config
readyCh chan struct{}
- errorCh chan error
+ stopCh chan struct{}
}
// Create create serf agent with config
@@ -51,26 +51,21 @@ func Create(conf *Config) (*Agent, error) {
Agent: serfAgent,
conf: conf,
readyCh: make(chan struct{}),
- errorCh: make(chan error),
+ stopCh: make(chan struct{}),
}, nil
}
// Start agent
func (a *Agent) Start(ctx context.Context) {
err := a.Agent.Start()
- if err != nil {
- log.Errorf(err, "start serf agent failed")
- a.errorCh <- err
- return
+ if err == nil {
+ a.RegisterEventHandler(a)
+ err = a.retryJoin(ctx)
}
- a.RegisterEventHandler(a)
- err = a.retryJoin(ctx)
if err != nil {
log.Errorf(err, "start serf agent failed")
- if err != ctx.Err() && a.errorCh != nil {
- a.errorCh <- err
- }
+ close(a.stopCh)
}
}
@@ -97,19 +92,15 @@ func (a *Agent) Ready() <-chan struct{} {
return a.readyCh
}
-// Error Returns a channel that will be transmit a serf error
-func (a *Agent) Error() <-chan error {
- return a.errorCh
+// Error Returns a channel that will be closed when serf is stopped
+func (a *Agent) Stopped() <-chan struct{} {
+ return a.stopCh
}
// Stop serf agent
func (a *Agent) Stop() {
- if a.errorCh != nil {
- a.Leave()
- a.Shutdown()
- close(a.errorCh)
- a.errorCh = nil
- }
+ a.Leave()
+ a.Shutdown()
}
// LocalMember returns the Member information for the local node
diff --git a/syncer/server/server.go b/syncer/server/server.go
index 17a2b60..3d04767 100644
--- a/syncer/server/server.go
+++ b/syncer/server/server.go
@@ -56,8 +56,8 @@ type moduleServer interface {
// Returns a channel that will be closed when the module server is ready
Ready() <-chan struct{}
- // Returns a channel that will be transmit a module server error
- Error() <-chan error
+ // Returns a channel that will be closed when the module server is
stopped
+ Stopped() <-chan struct{}
}
// Server struct for syncer
@@ -105,25 +105,21 @@ func (s *Server) Run(ctx context.Context) {
err = s.startModuleServer(s.agent)
if err != nil {
- s.Stop()
return
}
err = s.configureCluster()
if err != nil {
- s.Stop()
return
}
err = s.startModuleServer(s.etcd)
if err != nil {
- s.Stop()
return
}
err = s.startModuleServer(s.grpc)
if err != nil {
- s.Stop()
return
}
@@ -170,11 +166,12 @@ func (s *Server) startModuleServer(module moduleServer)
(err error) {
gopool.Go(module.Start)
select {
case <-module.Ready():
- case err = <-module.Error():
+ return nil
+ case <-module.Stopped():
case <-s.stopCh:
- err = stopChanErr
}
- return err
+ s.Stop()
+ return stopChanErr
}
// initialization Initialize the starter of the syncer