This is an automated email from the ASF dual-hosted git repository.
zenlin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new d3049c7 Use etcd option to optimize dependencies
d3049c7 is described below
commit d3049c7439c4ad00311356acb9c586b07aff1b04
Author: chinxââ <[email protected]>
AuthorDate: Wed Mar 25 23:28:15 2020 +0800
Use etcd option to optimize dependencies
---
syncer/etcd/agent.go | 103 ----------------------------
syncer/etcd/config.go | 69 -------------------
syncer/etcd/etcd.go | 133 ++++++++++++++++++++++++++++++++++++
syncer/etcd/etcd_test.go | 80 ++++++++++++++++++++++
syncer/etcd/option.go | 118 ++++++++++++++++++++++++++++++++
syncer/pkg/mock/mocksotrage/etcd.go | 26 ++++---
syncer/server/convert.go | 19 ++----
syncer/server/server.go | 44 +++++-------
8 files changed, 365 insertions(+), 227 deletions(-)
diff --git a/syncer/etcd/agent.go b/syncer/etcd/agent.go
deleted file mode 100644
index c0607a3..0000000
--- a/syncer/etcd/agent.go
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package etcd
-
-import (
- "context"
- "errors"
-
- "github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/coreos/etcd/clientv3"
- "github.com/coreos/etcd/embed"
- "github.com/coreos/etcd/etcdserver/api/v3client"
-)
-
-// Agent warps the embed etcd
-type Agent struct {
- conf *Config
- etcd *embed.Etcd
- readyCh chan struct{}
- stopCh chan struct{}
-}
-
-// NewAgent new etcd agent with config
-func NewAgent(conf *Config) *Agent {
- return &Agent{
- conf: conf,
- readyCh: make(chan struct{}),
- stopCh: make(chan struct{}),
- }
-}
-
-// Start etcd agent
-func (a *Agent) Start(ctx context.Context) {
- etcd, err := embed.StartEtcd(a.conf.Config)
- if err == nil {
- a.etcd = etcd
- select {
- // Be returns when the server is readied
- case <-etcd.Server.ReadyNotify():
- log.Info("start etcd success")
- close(a.readyCh)
-
- // Be returns when the server is stopped
- case <-etcd.Server.StopNotify():
- err = errors.New("unknown error cause start etcd
failed, check etcd")
-
- // Returns an error when running goroutine fails in the etcd
startup process
- case err = <-etcd.Err():
- case <-ctx.Done():
- err = errors.New("cancel etcd server from context")
- }
- }
-
- if err != nil {
- log.Error("start etcd failed", err)
- close(a.stopCh)
- }
-}
-
-// Ready Returns a channel that will be closed when etcd is ready
-func (a *Agent) Ready() <-chan struct{} {
- return a.readyCh
-}
-
-// Error Returns a channel that will be closed when etcd is stopped
-func (a *Agent) Stopped() <-chan struct{} {
- return a.stopCh
-}
-
-// Storage returns etcd storage
-func (a *Agent) Storage() *clientv3.Client {
- return v3client.New(a.etcd.Server)
-}
-
-// Stop etcd agent
-func (a *Agent) Stop() {
- if a.etcd != nil {
- a.etcd.Close()
- }
-}
-
-// IsLeader Check leader
-func (a *Agent) IsLeader() bool {
- if a.etcd == nil || a.etcd.Server == nil {
- return false
- }
- return a.etcd.Server.Leader() == a.etcd.Server.ID()
-}
diff --git a/syncer/etcd/config.go b/syncer/etcd/config.go
deleted file mode 100644
index bdc209f..0000000
--- a/syncer/etcd/config.go
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package etcd
-
-import (
- "fmt"
- "net/url"
- "os"
-
- "github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/coreos/etcd/compactor"
- "github.com/coreos/etcd/embed"
- "github.com/coreos/etcd/etcdserver"
-)
-
-var (
- defaultDataDir = "syncer-data/"
- defaultListenPeerAddr = "http://127.0.0.1:39102"
-)
-
-type Config struct {
- *embed.Config
-}
-
-// DefaultConfig default configure of etcd
-func DefaultConfig() *Config {
- etcdConf := embed.NewConfig()
- hostname, err := os.Hostname()
- if err != nil {
- log.Errorf(err, "Error determining hostname: %s", err)
- return nil
- }
-
- peer, _ := url.Parse(defaultListenPeerAddr)
- etcdConf.ACUrls = nil
- etcdConf.LCUrls = nil
- etcdConf.APUrls = []url.URL{*peer}
- etcdConf.LPUrls = []url.URL{*peer}
-
- etcdConf.EnableV2 = false
- etcdConf.EnablePprof = false
- etcdConf.QuotaBackendBytes = etcdserver.MaxQuotaBytes
- etcdConf.Dir = defaultDataDir + hostname
- etcdConf.Name = hostname
- etcdConf.InitialCluster = fmt.Sprintf("%s=%s", hostname,
defaultListenPeerAddr)
- etcdConf.AutoCompactionMode = compactor.ModePeriodic
- etcdConf.AutoCompactionRetention = "1h"
- return &Config{Config: etcdConf}
-}
-
-func (c *Config)SetName(name string) {
- c.Name = name
- c.Dir = defaultDataDir + name
-}
diff --git a/syncer/etcd/etcd.go b/syncer/etcd/etcd.go
new file mode 100644
index 0000000..b0a10a5
--- /dev/null
+++ b/syncer/etcd/etcd.go
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package etcd
+
+import (
+ "context"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/syncer/pkg/utils"
+ "github.com/coreos/etcd/clientv3"
+ "github.com/coreos/etcd/embed"
+ "github.com/coreos/etcd/etcdserver/api/v3client"
+ "github.com/pkg/errors"
+)
+
+// Server etcd server
+type Server struct {
+ conf *embed.Config
+ etcd *embed.Etcd
+ running *utils.AtomicBool
+
+ readyCh chan struct{}
+ stopCh chan struct{}
+}
+
+// NewServer new etcd server with options
+func NewServer(ops ...Option) (*Server, error) {
+ conf, err := toEtcdConfig(ops...)
+ if err != nil {
+ return nil, errors.Wrap(err, "options to etcd config failed")
+ }
+ return &Server{
+ conf: conf,
+ running: utils.NewAtomicBool(false),
+ readyCh: make(chan struct{}),
+ stopCh: make(chan struct{}),
+ }, nil
+}
+
+// Start etcd server
+func (s *Server) Start(ctx context.Context) {
+ s.running.DoToReverse(false, func() {
+ etcd, err := embed.StartEtcd(s.conf)
+ if err != nil {
+ log.Error("etcd: start server failed", err)
+ close(s.stopCh)
+ return
+ }
+ s.etcd = etcd
+ go s.waitNotify(ctx)
+ })
+}
+
+// AddOptions add some options when server not running
+func (s *Server) AddOptions(ops ...Option) error {
+ if s.running.Bool() {
+ return errors.New("etcd server was running")
+ }
+ return mergeConfig(s.conf, toConfig(ops...))
+}
+
+// Ready Returns a channel that will be closed when etcd is ready
+func (s *Server) Ready() <-chan struct{} {
+ return s.readyCh
+}
+
+// Stopped Returns a channel that will be closed when etcd is stopped
+func (s *Server) Stopped() <-chan struct{} {
+ return s.stopCh
+}
+
+// Storage returns etcd storage
+func (s *Server) Storage() *clientv3.Client {
+ return v3client.New(s.etcd.Server)
+}
+
+// IsLeader Check leader
+func (s *Server) IsLeader() bool {
+ if s.etcd == nil || s.etcd.Server == nil {
+ return false
+ }
+ return s.etcd.Server.Leader() == s.etcd.Server.ID()
+}
+
+// Stop etcd server
+func (s *Server) Stop() {
+ s.running.DoToReverse(true, func() {
+ if s.etcd != nil {
+ log.Info("etcd: begin shutdown")
+ s.etcd.Close()
+ close(s.stopCh)
+ }
+ log.Info("etcd: shutdown complete")
+ })
+}
+
+func (s *Server) waitNotify(ctx context.Context) {
+ select {
+ // Be returns when the server is readied
+ case <-s.etcd.Server.ReadyNotify():
+ log.Info("etcd: start server success")
+ close(s.readyCh)
+
+ // Be returns when the server is stopped
+ case <-s.etcd.Server.StopNotify():
+ log.Warn("etcd: server stopped, quitting")
+ s.Stop()
+
+ // Returns an error when running goroutine fails in the etcd startup
process
+ case err := <-s.etcd.Err():
+ log.Error("etcd: server happened error, quitting", err)
+ s.Stop()
+
+ case <-ctx.Done():
+ log.Warn("etcd: cancel server by context")
+ s.Stop()
+ }
+}
diff --git a/syncer/etcd/etcd_test.go b/syncer/etcd/etcd_test.go
new file mode 100644
index 0000000..4032092
--- /dev/null
+++ b/syncer/etcd/etcd_test.go
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package etcd
+
+import (
+ "context"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/pkg/errors"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestETCDServer(t *testing.T) {
+ defer os.RemoveAll("test-data")
+ svr, err := NewServer(
+ WithName("test"),
+ WithDataDir("test-data/a"),
+ WithPeerAddr("127.0.0.1:8090"),
+ )
+ assert.Nil(t, err)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ err = startServer(ctx, svr)
+ assert.Nil(t, err)
+
+ svr.IsLeader()
+ svr.Storage()
+
+ err = svr.AddOptions(WithAddPeers("defalt", "127.0.0.1:8092"))
+ cancel()
+ assert.NotNil(t, err)
+
+ svr.Stop()
+
+ svr, err = NewServer(
+ WithName("test"),
+ WithDataDir("test-data/b"),
+ WithPeerAddr("127.0.0.1:8091"),
+ )
+ assert.Nil(t, err)
+
+ err = svr.AddOptions(WithAddPeers("defalt", "127.0.0.1:8092"))
+ assert.Nil(t, err)
+
+ ctx, cancel = context.WithCancel(context.Background())
+ err = startServer(ctx, svr)
+ cancel()
+ assert.NotNil(t, err)
+
+ <-time.After(time.Second * 3)
+}
+
+func startServer(ctx context.Context, svr *Server) (err error) {
+ svr.Start(ctx)
+ select {
+ case <-svr.Ready():
+ case <-svr.Stopped():
+ err = errors.New("start etcd server failed")
+ case <-time.After(time.Second * 3):
+ err = errors.New("start etcd server timeout")
+ }
+ return
+}
diff --git a/syncer/etcd/option.go b/syncer/etcd/option.go
new file mode 100644
index 0000000..1a951bf
--- /dev/null
+++ b/syncer/etcd/option.go
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package etcd
+
+import (
+ "net/url"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/tlsutil"
+ "github.com/coreos/etcd/compactor"
+ "github.com/coreos/etcd/embed"
+ "github.com/coreos/etcd/etcdserver"
+)
+
+type config struct {
+ name string
+ dataDir string
+ peerAddr string
+ tlsConf *tlsutil.SSLConfig
+ peers map[string]string
+}
+
+// Option to etcd config
+type Option func(*config)
+
+// WithName returns name option
+func WithName(name string) Option {
+ return func(c *config) { c.name = name }
+}
+
+// WithDataDir returns data dir option
+func WithDataDir(dir string) Option {
+ return func(c *config) { c.dataDir = dir }
+}
+
+// WithPeerAddr returns peer address option
+func WithPeerAddr(peerAddr string) Option {
+ return func(c *config) { c.peerAddr = peerAddr }
+}
+
+// WithAddPeers returns add peers option
+func WithAddPeers(name, addr string) Option {
+ return func(c *config) {
+ c.peers[name] = addr
+ }
+}
+
+func toConfig(ops ...Option) *config {
+ c := &config{peers: map[string]string{}}
+ for _, op := range ops {
+ op(c)
+ }
+ return c
+}
+
+func toEtcdConfig(ops ...Option) (*embed.Config, error) {
+ conf := embed.NewConfig()
+ conf.EnableV2 = false
+ conf.EnablePprof = false
+ conf.QuotaBackendBytes = etcdserver.MaxQuotaBytes
+ conf.AutoCompactionMode = compactor.ModePeriodic
+ conf.AutoCompactionRetention = "1h"
+
+ conf.ACUrls = nil
+ conf.LCUrls = nil
+
+ err := mergeConfig(conf, toConfig(ops...))
+ if err != nil {
+ return nil, err
+ }
+ return conf, nil
+}
+
+func mergeConfig(src *embed.Config, dst *config) error {
+ if dst.name != "" {
+ src.Name = dst.name
+ }
+
+ if dst.dataDir != "" {
+ src.Dir = dst.dataDir
+ }
+
+ proto := "http://"
+ if dst.peerAddr != "" {
+ peer, err := url.Parse(proto + dst.peerAddr)
+ if err != nil {
+ log.Errorf(err, "parse peer listener '%s' failed",
dst.peerAddr)
+ return err
+ }
+ src.APUrls = []url.URL{*peer}
+ src.LPUrls = []url.URL{*peer}
+ src.InitialCluster = src.Name + "=" + peer.String()
+ }
+
+ if len(dst.peers) > 0 {
+ initialCluster := ""
+ for key, val := range dst.peers {
+ initialCluster += key + "=" + proto + val + ","
+ }
+ src.InitialCluster = initialCluster[:len(initialCluster)-1]
+ }
+ return nil
+}
diff --git a/syncer/pkg/mock/mocksotrage/etcd.go
b/syncer/pkg/mock/mocksotrage/etcd.go
index 576af19..c53e3ad 100644
--- a/syncer/pkg/mock/mocksotrage/etcd.go
+++ b/syncer/pkg/mock/mocksotrage/etcd.go
@@ -20,8 +20,6 @@ package mocksotrage
import (
"context"
"errors"
- "fmt"
- "net/url"
"os"
"github.com/apache/servicecomb-service-center/syncer/etcd"
@@ -31,15 +29,18 @@ import (
const (
defaultName = "etcd_mock"
defaultDataDir = "mock-data/"
- defaultListenPeerAddr = "http://127.0.0.1:30993"
+ defaultListenPeerAddr = "127.0.0.1:30993"
)
type MockServer struct {
- etcd *etcd.Agent
+ etcd *etcd.Server
}
func NewKVServer() (svr *MockServer, err error) {
- agent := etcd.NewAgent(defaultConfig())
+ agent, err1 := etcd.NewServer(defaultOptions()...)
+ if err1 != nil {
+ return nil, err
+ }
go agent.Start(context.Background())
select {
case <-agent.Ready():
@@ -58,13 +59,10 @@ func (m *MockServer) Stop() {
os.RemoveAll(defaultDataDir)
}
-func defaultConfig() *etcd.Config {
- peer, _ := url.Parse(defaultListenPeerAddr)
- conf := etcd.DefaultConfig()
- conf.Name = defaultName
- conf.Dir = defaultDataDir + defaultName
- conf.APUrls = []url.URL{*peer}
- conf.LPUrls = []url.URL{*peer}
- conf.InitialCluster = fmt.Sprintf("%s=%s", defaultName,
defaultListenPeerAddr)
- return conf
+func defaultOptions() []etcd.Option {
+ return []etcd.Option{
+ etcd.WithPeerAddr(defaultListenPeerAddr),
+ etcd.WithName(defaultName),
+ etcd.WithDataDir(defaultDataDir + defaultName),
+ }
}
diff --git a/syncer/server/convert.go b/syncer/server/convert.go
index 45e3024..d9d2ccc 100644
--- a/syncer/server/convert.go
+++ b/syncer/server/convert.go
@@ -19,7 +19,6 @@ package server
import (
"crypto/tls"
- "net/url"
"strings"
"time"
@@ -49,20 +48,12 @@ func convertSerfConfig(c *config.Config) *serf.Config {
return conf
}
-func convertEtcdConfig(c *config.Config) *etcd.Config {
- conf := etcd.DefaultConfig()
- conf.Name = c.Node
- conf.Dir = c.DataDir
- proto := "http://"
-
- if c.Listener.TLSMount.Enabled {
- proto = "https://"
+func convertEtcdOptions(c *config.Config) []etcd.Option {
+ return []etcd.Option{
+ etcd.WithName(c.Node),
+ etcd.WithDataDir(c.DataDir),
+ etcd.WithPeerAddr(c.Listener.PeerAddr),
}
-
- peer, _ := url.Parse(proto + c.Listener.PeerAddr)
- conf.APUrls = []url.URL{*peer}
- conf.LPUrls = []url.URL{*peer}
- return conf
}
func convertTaskOptions(c *config.Config) []task.Option {
diff --git a/syncer/server/server.go b/syncer/server/server.go
index 3d04767..5e81b19 100644
--- a/syncer/server/server.go
+++ b/syncer/server/server.go
@@ -14,13 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package server
import (
"context"
"crypto/tls"
"errors"
- "net/url"
"strconv"
"syscall"
@@ -62,6 +62,8 @@ type moduleServer interface {
// Server struct for syncer
type Server struct {
+ ctx context.Context
+ cancel context.CancelFunc
// Syncer configuration
conf *config.Config
@@ -71,8 +73,7 @@ type Server struct {
// Wrap the servicecenter
servicecenter servicecenter.Servicecenter
- etcd *etcd.Agent
- etcdConf *etcd.Config
+ etcd *etcd.Server
// Wraps the serf agent
agent *serf.Agent
@@ -86,7 +87,10 @@ type Server struct {
// NewServer new server with Config
func NewServer(conf *config.Config) *Server {
+ ctx, cancel := context.WithCancel(context.Background())
return &Server{
+ ctx: ctx,
+ cancel: cancel,
conf: conf,
stopCh: make(chan struct{}),
}
@@ -158,6 +162,8 @@ func (s *Server) Stop() {
s.etcd.Stop()
}
+ s.cancel()
+
// Closes all goroutines in the pool
gopool.CloseAndWait()
}
@@ -191,8 +197,11 @@ func (s *Server) initialization() (err error) {
return
}
- s.etcdConf = convertEtcdConfig(s.conf)
- s.etcd = etcd.NewAgent(s.etcdConf)
+ s.etcd, err = etcd.NewServer(convertEtcdOptions(s.conf)...)
+ if err != nil {
+ log.Errorf(err, "Create etcd failed, %s", err)
+ return
+ }
s.task, err = task.GenerateTasker(s.conf.Task.Kind,
convertTaskOptions(s.conf)...)
if err != nil {
@@ -229,34 +238,15 @@ func (s *Server) initPlugin() {
// configureCluster Configuring the cluster by serf group member information
func (s *Server) configureCluster() error {
- proto := "http"
- if s.conf.Listener.TLSMount.Enabled {
- proto = "https"
- }
- initialCluster := ""
-
// get local member of serf
self := s.agent.LocalMember()
_, peerPort, _ := utils.SplitAddress(s.conf.Listener.PeerAddr)
- peerUrl, err := url.Parse(proto + "://" + self.Addr.String() + ":" +
strconv.Itoa(peerPort))
- if err != nil {
- log.Error("parse url from serf local member failed", err)
- return err
- }
+ ops := []etcd.Option{etcd.WithPeerAddr(self.Addr.String() + ":" +
strconv.Itoa(peerPort))}
// group members from serf as initial cluster members
for _, member := range s.agent.GroupMembers(s.conf.Cluster) {
- initialCluster += member.Name + "=" + proto + "://" +
member.Addr.String() + ":" + member.Tags[serf.TagKeyClusterPort] + ","
+ ops = append(ops, etcd.WithAddPeers(member.Name,
member.Addr.String()+":"+member.Tags[serf.TagKeyClusterPort]))
}
- leng := len(initialCluster)
- if leng == 0 {
- err = errors.New("serf group members is empty")
- log.Error("etcd peer not found", err)
- return err
- }
- s.etcdConf.APUrls = []url.URL{*peerUrl}
- s.etcdConf.LPUrls = []url.URL{*peerUrl}
- s.etcdConf.InitialCluster = initialCluster[:len(initialCluster)-1]
- return nil
+ return s.etcd.AddOptions(ops...)
}