This is an automated email from the ASF dual-hosted git repository.

mabin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new d843b7b  [SCB-1427] Move storage to syncer/servicecenter
     new eb503d0  Merge pull request #564 from ChinX/syncer
d843b7b is described below

commit d843b7b7290d2144b00b019320d6fe07852ce118
Author: chinx <[email protected]>
AuthorDate: Fri Aug 9 10:13:18 2019 +0800

    [SCB-1427] Move storage to syncer/servicecenter
---
 syncer/etcd/agent.go                              | 10 +---
 syncer/pkg/mock/mocksotrage/etcd.go               | 71 +++++++++++++++++++++++
 syncer/server/server.go                           |  2 +-
 syncer/servicecenter/servicecenter.go             | 19 ++----
 syncer/servicecenter/servicecenter_test.go        |  8 ++-
 syncer/{etcd => servicecenter/storage}/storage.go | 34 ++++++-----
 6 files changed, 109 insertions(+), 35 deletions(-)

diff --git a/syncer/etcd/agent.go b/syncer/etcd/agent.go
index 1330b48..a52a511 100644
--- a/syncer/etcd/agent.go
+++ b/syncer/etcd/agent.go
@@ -22,7 +22,7 @@ import (
        "errors"
 
        "github.com/apache/servicecomb-service-center/pkg/log"
-       "github.com/apache/servicecomb-service-center/syncer/servicecenter"
+       "github.com/coreos/etcd/clientv3"
        "github.com/coreos/etcd/embed"
        "github.com/coreos/etcd/etcdserver/api/v3client"
 )
@@ -31,7 +31,6 @@ import (
 type Agent struct {
        conf    *Config
        etcd    *embed.Etcd
-       storage servicecenter.Storage
        readyCh chan struct{}
        errorCh chan error
 }
@@ -84,11 +83,8 @@ func (a *Agent) Error() <-chan error {
 }
 
 // Storage returns etcd storage
-func (a *Agent) Storage() servicecenter.Storage {
-       if a.storage == nil {
-               a.storage = NewStorage(v3client.New(a.etcd.Server))
-       }
-       return a.storage
+func (a *Agent) Storage() *clientv3.Client {
+       return v3client.New(a.etcd.Server)
 }
 
 // Stop etcd agent
diff --git a/syncer/pkg/mock/mocksotrage/etcd.go 
b/syncer/pkg/mock/mocksotrage/etcd.go
new file mode 100644
index 0000000..3604a16
--- /dev/null
+++ b/syncer/pkg/mock/mocksotrage/etcd.go
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mocksotrage
+
+import (
+       "context"
+       "fmt"
+       "net/url"
+       "os"
+
+       "github.com/apache/servicecomb-service-center/syncer/etcd"
+       "github.com/coreos/etcd/clientv3"
+)
+
+const (
+       defaultName           = "etcd_mock"
+       defaultDataDir        = "mock-data/"
+       defaultListenPeerAddr = "http://127.0.0.1:30993";
+)
+
+type MockServer struct {
+       etcd *etcd.Agent
+}
+
+func NewKVServer() (svr *MockServer, err error) {
+       agent := etcd.NewAgent(defaultConfig())
+       go agent.Start(context.Background())
+       select {
+       case <-agent.Ready():
+       case err = <-agent.Error():
+       }
+       if err != nil {
+               return nil, err
+       }
+       return &MockServer{agent}, nil
+}
+
+func (m *MockServer) Storage() *clientv3.Client {
+       return m.etcd.Storage()
+}
+
+func (m *MockServer) Stop() {
+       m.etcd.Stop()
+       os.RemoveAll(defaultDataDir)
+}
+
+func defaultConfig() *etcd.Config {
+       peer, _ := url.Parse(defaultListenPeerAddr)
+       conf := etcd.DefaultConfig()
+       conf.Name = defaultName
+       conf.Dir = defaultDataDir + defaultName
+       conf.APUrls = []url.URL{*peer}
+       conf.LPUrls = []url.URL{*peer}
+       conf.InitialCluster = fmt.Sprintf("%s=%s", defaultName, 
defaultListenPeerAddr)
+       return conf
+}
diff --git a/syncer/server/server.go b/syncer/server/server.go
index 213ba95..afed8d0 100644
--- a/syncer/server/server.go
+++ b/syncer/server/server.go
@@ -116,7 +116,7 @@ func (s *Server) Run(ctx context.Context) {
                return
        }
 
-       s.servicecenter.SetStorage(s.etcd.Storage())
+       s.servicecenter.SetStorageEngine(s.etcd.Storage())
 
        s.agent.RegisterEventHandler(s)
 
diff --git a/syncer/servicecenter/servicecenter.go 
b/syncer/servicecenter/servicecenter.go
index 63f2d29..c8a288a 100644
--- a/syncer/servicecenter/servicecenter.go
+++ b/syncer/servicecenter/servicecenter.go
@@ -23,11 +23,13 @@ import (
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/syncer/plugins"
        pb "github.com/apache/servicecomb-service-center/syncer/proto"
+       
"github.com/apache/servicecomb-service-center/syncer/servicecenter/storage"
+       "github.com/coreos/etcd/clientv3"
 )
 
 // Store interface of servicecenter
 type Servicecenter interface {
-       SetStorage(storage Storage)
+       SetStorageEngine(engine clientv3.KV)
        FlushData()
        Registry(clusterName string, data *pb.SyncData)
        Discovery() *pb.SyncData
@@ -35,16 +37,7 @@ type Servicecenter interface {
 
 type servicecenter struct {
        servicecenter plugins.Servicecenter
-       storage       Storage
-}
-
-type Storage interface {
-       GetData() (data *pb.SyncData)
-       UpdateData(data *pb.SyncData)
-       GetMaps() (maps pb.SyncMapping)
-       UpdateMaps(maps pb.SyncMapping)
-       GetMapByCluster(clusterName string) (mapping pb.SyncMapping)
-       UpdateMapByCluster(clusterName string, mapping pb.SyncMapping)
+       storage       storage.Storage
 }
 
 // NewServicecenter new store with endpoints
@@ -59,8 +52,8 @@ func NewServicecenter(endpoints []string) (Servicecenter, 
error) {
        }, nil
 }
 
-func (s *servicecenter) SetStorage(storage Storage) {
-       s.storage = storage
+func (s *servicecenter) SetStorageEngine(engine clientv3.KV) {
+       s.storage = storage.NewStorage(engine)
 }
 
 // FlushData flush data to servicecenter, update mapping data
diff --git a/syncer/servicecenter/servicecenter_test.go 
b/syncer/servicecenter/servicecenter_test.go
index 120b109..cef2040 100644
--- a/syncer/servicecenter/servicecenter_test.go
+++ b/syncer/servicecenter/servicecenter_test.go
@@ -57,7 +57,13 @@ func TestOnEvent(t *testing.T) {
                t.Fatal(err)
                return
        }
-       dc.SetStorage(mocksotrage.New())
+       mockServer, err := mocksotrage.NewKVServer()
+       if err != nil {
+               t.Fatal(err)
+               return
+       }
+       defer mockServer.Stop()
+       dc.SetStorageEngine(mockServer.Storage())
 
        mockplugin.SetGetAll(func(ctx context.Context) (data *pb.SyncData, e 
error) {
                return nil, errors.New("test error")
diff --git a/syncer/etcd/storage.go b/syncer/servicecenter/storage/storage.go
similarity index 90%
rename from syncer/etcd/storage.go
rename to syncer/servicecenter/storage/storage.go
index f77af90..c6abf44 100644
--- a/syncer/etcd/storage.go
+++ b/syncer/servicecenter/storage/storage.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package etcd
+package storage
 
 import (
        "context"
@@ -24,7 +24,6 @@ import (
        "github.com/apache/servicecomb-service-center/pkg/log"
        "github.com/apache/servicecomb-service-center/pkg/util"
        pb "github.com/apache/servicecomb-service-center/syncer/proto"
-       "github.com/apache/servicecomb-service-center/syncer/servicecenter"
        "github.com/coreos/etcd/clientv3"
        "github.com/gogo/protobuf/proto"
 )
@@ -38,15 +37,24 @@ var (
        instancesKey = "/syncer/v1/instances"
 )
 
+type Storage interface {
+       GetData() (data *pb.SyncData)
+       UpdateData(data *pb.SyncData)
+       GetMaps() (maps pb.SyncMapping)
+       UpdateMaps(maps pb.SyncMapping)
+       GetMapByCluster(clusterName string) (mapping pb.SyncMapping)
+       UpdateMapByCluster(clusterName string, mapping pb.SyncMapping)
+}
+
 type storage struct {
-       client *clientv3.Client
+       engine clientv3.KV
        data   *pb.SyncData
        lock   sync.RWMutex
 }
 
-func NewStorage(client *clientv3.Client) servicecenter.Storage {
+func NewStorage(engine clientv3.KV) Storage {
        storage := &storage{
-               client: client,
+               engine: engine,
                data:   &pb.SyncData{},
        }
        return storage
@@ -54,7 +62,7 @@ func NewStorage(client *clientv3.Client) 
servicecenter.Storage {
 
 // getPrefixKey Get data from etcd based on the prefix key
 func (s *storage) getPrefixKey(prefix string, handler func(key, val []byte) 
(next bool)) {
-       resp, err := s.client.Get(context.Background(), prefix, 
clientv3.WithPrefix())
+       resp, err := s.engine.Get(context.Background(), prefix, 
clientv3.WithPrefix())
        if err != nil {
                log.Errorf(err, "Get mapping from etcd failed: %s", err)
                return
@@ -125,7 +133,7 @@ next:
                        }
                }
                key := mappingsKey + "/" + entry.ClusterName + "/" + 
entry.OrgInstanceID
-               if _, err := s.client.Delete(context.Background(), key); err != 
nil {
+               if _, err := s.engine.Delete(context.Background(), key); err != 
nil {
                        log.Errorf(err, "Delete instance clusterName=%s 
instanceID=%s failed", entry.ClusterName, entry.OrgInstanceID)
                }
 
@@ -142,7 +150,7 @@ func (s *storage) UpdateServices(services 
[]*pb.SyncService) {
                        log.Errorf(err, "Proto marshal failed: %s", err)
                        continue
                }
-               _, err = s.client.Put(context.Background(), key, 
util.BytesToStringWithNoCopy(data))
+               _, err = s.engine.Put(context.Background(), key, 
util.BytesToStringWithNoCopy(data))
                if err != nil {
                        log.Errorf(err, "Save service to etcd failed: %s", err)
                }
@@ -175,7 +183,7 @@ func (s *storage) DeleteServices(services 
[]*pb.SyncService) {
 // DeleteServices Delete services from storage
 func (s *storage) deleteService(serviceId string) {
        key := servicesKey + "/" + serviceId
-       _, err := s.client.Delete(context.Background(), key)
+       _, err := s.engine.Delete(context.Background(), key)
        if err != nil {
                log.Errorf(err, "Delete service from etcd failed: %s", err)
        }
@@ -190,7 +198,7 @@ func (s *storage) UpdateInstances(instances 
[]*pb.SyncInstance) {
                        log.Errorf(err, "Proto marshal failed: %s", err)
                        continue
                }
-               _, err = s.client.Put(context.Background(), key, 
util.BytesToStringWithNoCopy(data))
+               _, err = s.engine.Put(context.Background(), key, 
util.BytesToStringWithNoCopy(data))
                if err != nil {
                        log.Errorf(err, "Save instance to etcd failed: %s", err)
                }
@@ -222,7 +230,7 @@ func (s *storage) DeleteInstances(instances 
[]*pb.SyncInstance) {
 
 func (s *storage) deleteInstance(instanceID string) {
        key := instancesKey + "/" + instanceID
-       _, err := s.client.Delete(context.Background(), key)
+       _, err := s.engine.Delete(context.Background(), key)
        if err != nil {
                log.Errorf(err, "Delete instance from etcd failed: %s", err)
        }
@@ -238,7 +246,7 @@ func (s *storage) UpdateMapByCluster(clusterName string, 
mapping pb.SyncMapping)
                        log.Errorf(err, "Proto marshal failed: %s", err)
                        continue
                }
-               _, err = s.client.Put(context.Background(), key, 
util.BytesToStringWithNoCopy(data))
+               _, err = s.engine.Put(context.Background(), key, 
util.BytesToStringWithNoCopy(data))
                if err != nil {
                        log.Errorf(err, "Save mapping to etcd failed: %s", err)
                }
@@ -275,7 +283,7 @@ func (s *storage) UpdateMaps(maps pb.SyncMapping) {
                        log.Errorf(err, "Proto marshal failed: %s", err)
                        continue
                }
-               _, err = s.client.Put(context.Background(), key, 
util.BytesToStringWithNoCopy(data))
+               _, err = s.engine.Put(context.Background(), key, 
util.BytesToStringWithNoCopy(data))
                if err != nil {
                        log.Errorf(err, "Save mapping to etcd failed: %s", err)
                        continue

Reply via email to