This is an automated email from the ASF dual-hosted git repository. zenlin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push: new d3049c7 Use etcd option to optimize dependencies d3049c7 is described below commit d3049c7439c4ad00311356acb9c586b07aff1b04 Author: chinxââ <c54948...@126.com> AuthorDate: Wed Mar 25 23:28:15 2020 +0800 Use etcd option to optimize dependencies --- syncer/etcd/agent.go | 103 ---------------------------- syncer/etcd/config.go | 69 ------------------- syncer/etcd/etcd.go | 133 ++++++++++++++++++++++++++++++++++++ syncer/etcd/etcd_test.go | 80 ++++++++++++++++++++++ syncer/etcd/option.go | 118 ++++++++++++++++++++++++++++++++ syncer/pkg/mock/mocksotrage/etcd.go | 26 ++++--- syncer/server/convert.go | 19 ++---- syncer/server/server.go | 44 +++++------- 8 files changed, 365 insertions(+), 227 deletions(-) diff --git a/syncer/etcd/agent.go b/syncer/etcd/agent.go deleted file mode 100644 index c0607a3..0000000 --- a/syncer/etcd/agent.go +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package etcd - -import ( - "context" - "errors" - - "github.com/apache/servicecomb-service-center/pkg/log" - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/embed" - "github.com/coreos/etcd/etcdserver/api/v3client" -) - -// Agent warps the embed etcd -type Agent struct { - conf *Config - etcd *embed.Etcd - readyCh chan struct{} - stopCh chan struct{} -} - -// NewAgent new etcd agent with config -func NewAgent(conf *Config) *Agent { - return &Agent{ - conf: conf, - readyCh: make(chan struct{}), - stopCh: make(chan struct{}), - } -} - -// Start etcd agent -func (a *Agent) Start(ctx context.Context) { - etcd, err := embed.StartEtcd(a.conf.Config) - if err == nil { - a.etcd = etcd - select { - // Be returns when the server is readied - case <-etcd.Server.ReadyNotify(): - log.Info("start etcd success") - close(a.readyCh) - - // Be returns when the server is stopped - case <-etcd.Server.StopNotify(): - err = errors.New("unknown error cause start etcd failed, check etcd") - - // Returns an error when running goroutine fails in the etcd startup process - case err = <-etcd.Err(): - case <-ctx.Done(): - err = errors.New("cancel etcd server from context") - } - } - - if err != nil { - log.Error("start etcd failed", err) - close(a.stopCh) - } -} - -// Ready Returns a channel that will be closed when etcd is ready -func (a *Agent) Ready() <-chan struct{} { - return a.readyCh -} - -// Error Returns a channel that will be closed when etcd is stopped -func (a *Agent) Stopped() <-chan struct{} { - return a.stopCh -} - -// Storage returns etcd storage -func (a *Agent) Storage() *clientv3.Client { - return v3client.New(a.etcd.Server) -} - -// Stop etcd agent -func (a *Agent) Stop() { - if a.etcd != nil { - a.etcd.Close() - } -} - -// IsLeader Check leader -func (a *Agent) IsLeader() bool { - if a.etcd == nil || a.etcd.Server == nil { - return false - } - return a.etcd.Server.Leader() == a.etcd.Server.ID() -} diff --git a/syncer/etcd/config.go b/syncer/etcd/config.go deleted file mode 100644 index bdc209f..0000000 --- a/syncer/etcd/config.go +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package etcd - -import ( - "fmt" - "net/url" - "os" - - "github.com/apache/servicecomb-service-center/pkg/log" - "github.com/coreos/etcd/compactor" - "github.com/coreos/etcd/embed" - "github.com/coreos/etcd/etcdserver" -) - -var ( - defaultDataDir = "syncer-data/" - defaultListenPeerAddr = "http://127.0.0.1:39102" -) - -type Config struct { - *embed.Config -} - -// DefaultConfig default configure of etcd -func DefaultConfig() *Config { - etcdConf := embed.NewConfig() - hostname, err := os.Hostname() - if err != nil { - log.Errorf(err, "Error determining hostname: %s", err) - return nil - } - - peer, _ := url.Parse(defaultListenPeerAddr) - etcdConf.ACUrls = nil - etcdConf.LCUrls = nil - etcdConf.APUrls = []url.URL{*peer} - etcdConf.LPUrls = []url.URL{*peer} - - etcdConf.EnableV2 = false - etcdConf.EnablePprof = false - etcdConf.QuotaBackendBytes = etcdserver.MaxQuotaBytes - etcdConf.Dir = defaultDataDir + hostname - etcdConf.Name = hostname - etcdConf.InitialCluster = fmt.Sprintf("%s=%s", hostname, defaultListenPeerAddr) - etcdConf.AutoCompactionMode = compactor.ModePeriodic - etcdConf.AutoCompactionRetention = "1h" - return &Config{Config: etcdConf} -} - -func (c *Config)SetName(name string) { - c.Name = name - c.Dir = defaultDataDir + name -} diff --git a/syncer/etcd/etcd.go b/syncer/etcd/etcd.go new file mode 100644 index 0000000..b0a10a5 --- /dev/null +++ b/syncer/etcd/etcd.go @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package etcd + +import ( + "context" + + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/syncer/pkg/utils" + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/embed" + "github.com/coreos/etcd/etcdserver/api/v3client" + "github.com/pkg/errors" +) + +// Server etcd server +type Server struct { + conf *embed.Config + etcd *embed.Etcd + running *utils.AtomicBool + + readyCh chan struct{} + stopCh chan struct{} +} + +// NewServer new etcd server with options +func NewServer(ops ...Option) (*Server, error) { + conf, err := toEtcdConfig(ops...) + if err != nil { + return nil, errors.Wrap(err, "options to etcd config failed") + } + return &Server{ + conf: conf, + running: utils.NewAtomicBool(false), + readyCh: make(chan struct{}), + stopCh: make(chan struct{}), + }, nil +} + +// Start etcd server +func (s *Server) Start(ctx context.Context) { + s.running.DoToReverse(false, func() { + etcd, err := embed.StartEtcd(s.conf) + if err != nil { + log.Error("etcd: start server failed", err) + close(s.stopCh) + return + } + s.etcd = etcd + go s.waitNotify(ctx) + }) +} + +// AddOptions add some options when server not running +func (s *Server) AddOptions(ops ...Option) error { + if s.running.Bool() { + return errors.New("etcd server was running") + } + return mergeConfig(s.conf, toConfig(ops...)) +} + +// Ready Returns a channel that will be closed when etcd is ready +func (s *Server) Ready() <-chan struct{} { + return s.readyCh +} + +// Stopped Returns a channel that will be closed when etcd is stopped +func (s *Server) Stopped() <-chan struct{} { + return s.stopCh +} + +// Storage returns etcd storage +func (s *Server) Storage() *clientv3.Client { + return v3client.New(s.etcd.Server) +} + +// IsLeader Check leader +func (s *Server) IsLeader() bool { + if s.etcd == nil || s.etcd.Server == nil { + return false + } + return s.etcd.Server.Leader() == s.etcd.Server.ID() +} + +// Stop etcd server +func (s *Server) Stop() { + s.running.DoToReverse(true, func() { + if s.etcd != nil { + log.Info("etcd: begin shutdown") + s.etcd.Close() + close(s.stopCh) + } + log.Info("etcd: shutdown complete") + }) +} + +func (s *Server) waitNotify(ctx context.Context) { + select { + // Be returns when the server is readied + case <-s.etcd.Server.ReadyNotify(): + log.Info("etcd: start server success") + close(s.readyCh) + + // Be returns when the server is stopped + case <-s.etcd.Server.StopNotify(): + log.Warn("etcd: server stopped, quitting") + s.Stop() + + // Returns an error when running goroutine fails in the etcd startup process + case err := <-s.etcd.Err(): + log.Error("etcd: server happened error, quitting", err) + s.Stop() + + case <-ctx.Done(): + log.Warn("etcd: cancel server by context") + s.Stop() + } +} diff --git a/syncer/etcd/etcd_test.go b/syncer/etcd/etcd_test.go new file mode 100644 index 0000000..4032092 --- /dev/null +++ b/syncer/etcd/etcd_test.go @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package etcd + +import ( + "context" + "os" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +func TestETCDServer(t *testing.T) { + defer os.RemoveAll("test-data") + svr, err := NewServer( + WithName("test"), + WithDataDir("test-data/a"), + WithPeerAddr("127.0.0.1:8090"), + ) + assert.Nil(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + err = startServer(ctx, svr) + assert.Nil(t, err) + + svr.IsLeader() + svr.Storage() + + err = svr.AddOptions(WithAddPeers("defalt", "127.0.0.1:8092")) + cancel() + assert.NotNil(t, err) + + svr.Stop() + + svr, err = NewServer( + WithName("test"), + WithDataDir("test-data/b"), + WithPeerAddr("127.0.0.1:8091"), + ) + assert.Nil(t, err) + + err = svr.AddOptions(WithAddPeers("defalt", "127.0.0.1:8092")) + assert.Nil(t, err) + + ctx, cancel = context.WithCancel(context.Background()) + err = startServer(ctx, svr) + cancel() + assert.NotNil(t, err) + + <-time.After(time.Second * 3) +} + +func startServer(ctx context.Context, svr *Server) (err error) { + svr.Start(ctx) + select { + case <-svr.Ready(): + case <-svr.Stopped(): + err = errors.New("start etcd server failed") + case <-time.After(time.Second * 3): + err = errors.New("start etcd server timeout") + } + return +} diff --git a/syncer/etcd/option.go b/syncer/etcd/option.go new file mode 100644 index 0000000..1a951bf --- /dev/null +++ b/syncer/etcd/option.go @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package etcd + +import ( + "net/url" + + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/pkg/tlsutil" + "github.com/coreos/etcd/compactor" + "github.com/coreos/etcd/embed" + "github.com/coreos/etcd/etcdserver" +) + +type config struct { + name string + dataDir string + peerAddr string + tlsConf *tlsutil.SSLConfig + peers map[string]string +} + +// Option to etcd config +type Option func(*config) + +// WithName returns name option +func WithName(name string) Option { + return func(c *config) { c.name = name } +} + +// WithDataDir returns data dir option +func WithDataDir(dir string) Option { + return func(c *config) { c.dataDir = dir } +} + +// WithPeerAddr returns peer address option +func WithPeerAddr(peerAddr string) Option { + return func(c *config) { c.peerAddr = peerAddr } +} + +// WithAddPeers returns add peers option +func WithAddPeers(name, addr string) Option { + return func(c *config) { + c.peers[name] = addr + } +} + +func toConfig(ops ...Option) *config { + c := &config{peers: map[string]string{}} + for _, op := range ops { + op(c) + } + return c +} + +func toEtcdConfig(ops ...Option) (*embed.Config, error) { + conf := embed.NewConfig() + conf.EnableV2 = false + conf.EnablePprof = false + conf.QuotaBackendBytes = etcdserver.MaxQuotaBytes + conf.AutoCompactionMode = compactor.ModePeriodic + conf.AutoCompactionRetention = "1h" + + conf.ACUrls = nil + conf.LCUrls = nil + + err := mergeConfig(conf, toConfig(ops...)) + if err != nil { + return nil, err + } + return conf, nil +} + +func mergeConfig(src *embed.Config, dst *config) error { + if dst.name != "" { + src.Name = dst.name + } + + if dst.dataDir != "" { + src.Dir = dst.dataDir + } + + proto := "http://" + if dst.peerAddr != "" { + peer, err := url.Parse(proto + dst.peerAddr) + if err != nil { + log.Errorf(err, "parse peer listener '%s' failed", dst.peerAddr) + return err + } + src.APUrls = []url.URL{*peer} + src.LPUrls = []url.URL{*peer} + src.InitialCluster = src.Name + "=" + peer.String() + } + + if len(dst.peers) > 0 { + initialCluster := "" + for key, val := range dst.peers { + initialCluster += key + "=" + proto + val + "," + } + src.InitialCluster = initialCluster[:len(initialCluster)-1] + } + return nil +} diff --git a/syncer/pkg/mock/mocksotrage/etcd.go b/syncer/pkg/mock/mocksotrage/etcd.go index 576af19..c53e3ad 100644 --- a/syncer/pkg/mock/mocksotrage/etcd.go +++ b/syncer/pkg/mock/mocksotrage/etcd.go @@ -20,8 +20,6 @@ package mocksotrage import ( "context" "errors" - "fmt" - "net/url" "os" "github.com/apache/servicecomb-service-center/syncer/etcd" @@ -31,15 +29,18 @@ import ( const ( defaultName = "etcd_mock" defaultDataDir = "mock-data/" - defaultListenPeerAddr = "http://127.0.0.1:30993" + defaultListenPeerAddr = "127.0.0.1:30993" ) type MockServer struct { - etcd *etcd.Agent + etcd *etcd.Server } func NewKVServer() (svr *MockServer, err error) { - agent := etcd.NewAgent(defaultConfig()) + agent, err1 := etcd.NewServer(defaultOptions()...) + if err1 != nil { + return nil, err + } go agent.Start(context.Background()) select { case <-agent.Ready(): @@ -58,13 +59,10 @@ func (m *MockServer) Stop() { os.RemoveAll(defaultDataDir) } -func defaultConfig() *etcd.Config { - peer, _ := url.Parse(defaultListenPeerAddr) - conf := etcd.DefaultConfig() - conf.Name = defaultName - conf.Dir = defaultDataDir + defaultName - conf.APUrls = []url.URL{*peer} - conf.LPUrls = []url.URL{*peer} - conf.InitialCluster = fmt.Sprintf("%s=%s", defaultName, defaultListenPeerAddr) - return conf +func defaultOptions() []etcd.Option { + return []etcd.Option{ + etcd.WithPeerAddr(defaultListenPeerAddr), + etcd.WithName(defaultName), + etcd.WithDataDir(defaultDataDir + defaultName), + } } diff --git a/syncer/server/convert.go b/syncer/server/convert.go index 45e3024..d9d2ccc 100644 --- a/syncer/server/convert.go +++ b/syncer/server/convert.go @@ -19,7 +19,6 @@ package server import ( "crypto/tls" - "net/url" "strings" "time" @@ -49,20 +48,12 @@ func convertSerfConfig(c *config.Config) *serf.Config { return conf } -func convertEtcdConfig(c *config.Config) *etcd.Config { - conf := etcd.DefaultConfig() - conf.Name = c.Node - conf.Dir = c.DataDir - proto := "http://" - - if c.Listener.TLSMount.Enabled { - proto = "https://" +func convertEtcdOptions(c *config.Config) []etcd.Option { + return []etcd.Option{ + etcd.WithName(c.Node), + etcd.WithDataDir(c.DataDir), + etcd.WithPeerAddr(c.Listener.PeerAddr), } - - peer, _ := url.Parse(proto + c.Listener.PeerAddr) - conf.APUrls = []url.URL{*peer} - conf.LPUrls = []url.URL{*peer} - return conf } func convertTaskOptions(c *config.Config) []task.Option { diff --git a/syncer/server/server.go b/syncer/server/server.go index 3d04767..5e81b19 100644 --- a/syncer/server/server.go +++ b/syncer/server/server.go @@ -14,13 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package server import ( "context" "crypto/tls" "errors" - "net/url" "strconv" "syscall" @@ -62,6 +62,8 @@ type moduleServer interface { // Server struct for syncer type Server struct { + ctx context.Context + cancel context.CancelFunc // Syncer configuration conf *config.Config @@ -71,8 +73,7 @@ type Server struct { // Wrap the servicecenter servicecenter servicecenter.Servicecenter - etcd *etcd.Agent - etcdConf *etcd.Config + etcd *etcd.Server // Wraps the serf agent agent *serf.Agent @@ -86,7 +87,10 @@ type Server struct { // NewServer new server with Config func NewServer(conf *config.Config) *Server { + ctx, cancel := context.WithCancel(context.Background()) return &Server{ + ctx: ctx, + cancel: cancel, conf: conf, stopCh: make(chan struct{}), } @@ -158,6 +162,8 @@ func (s *Server) Stop() { s.etcd.Stop() } + s.cancel() + // Closes all goroutines in the pool gopool.CloseAndWait() } @@ -191,8 +197,11 @@ func (s *Server) initialization() (err error) { return } - s.etcdConf = convertEtcdConfig(s.conf) - s.etcd = etcd.NewAgent(s.etcdConf) + s.etcd, err = etcd.NewServer(convertEtcdOptions(s.conf)...) + if err != nil { + log.Errorf(err, "Create etcd failed, %s", err) + return + } s.task, err = task.GenerateTasker(s.conf.Task.Kind, convertTaskOptions(s.conf)...) if err != nil { @@ -229,34 +238,15 @@ func (s *Server) initPlugin() { // configureCluster Configuring the cluster by serf group member information func (s *Server) configureCluster() error { - proto := "http" - if s.conf.Listener.TLSMount.Enabled { - proto = "https" - } - initialCluster := "" - // get local member of serf self := s.agent.LocalMember() _, peerPort, _ := utils.SplitAddress(s.conf.Listener.PeerAddr) - peerUrl, err := url.Parse(proto + "://" + self.Addr.String() + ":" + strconv.Itoa(peerPort)) - if err != nil { - log.Error("parse url from serf local member failed", err) - return err - } + ops := []etcd.Option{etcd.WithPeerAddr(self.Addr.String() + ":" + strconv.Itoa(peerPort))} // group members from serf as initial cluster members for _, member := range s.agent.GroupMembers(s.conf.Cluster) { - initialCluster += member.Name + "=" + proto + "://" + member.Addr.String() + ":" + member.Tags[serf.TagKeyClusterPort] + "," + ops = append(ops, etcd.WithAddPeers(member.Name, member.Addr.String()+":"+member.Tags[serf.TagKeyClusterPort])) } - leng := len(initialCluster) - if leng == 0 { - err = errors.New("serf group members is empty") - log.Error("etcd peer not found", err) - return err - } - s.etcdConf.APUrls = []url.URL{*peerUrl} - s.etcdConf.LPUrls = []url.URL{*peerUrl} - s.etcdConf.InitialCluster = initialCluster[:len(initialCluster)-1] - return nil + return s.etcd.AddOptions(ops...) }